commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20#![doc(
21    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22    html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use bytes::{Buf, BufMut};
26use commonware_macros::select;
27use commonware_parallel::{Rayon, ThreadPool};
28use commonware_utils::StableBuf;
29use prometheus_client::registry::Metric;
30use rayon::ThreadPoolBuildError;
31use std::{
32    future::Future,
33    io::Error as IoError,
34    net::SocketAddr,
35    num::NonZeroUsize,
36    time::{Duration, SystemTime},
37};
38use thiserror::Error;
39
40#[macro_use]
41mod macros;
42
43pub mod deterministic;
44pub mod mocks;
45cfg_if::cfg_if! {
46    if #[cfg(not(target_arch = "wasm32"))] {
47        pub mod tokio;
48        pub mod benchmarks;
49    }
50}
51mod network;
52mod process;
53mod storage;
54pub mod telemetry;
55pub mod utils;
56pub use utils::*;
57#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
58mod iouring;
59
60/// Prefix for runtime metrics.
61const METRICS_PREFIX: &str = "runtime";
62
63/// Default [`Blob`] version used when no version is specified via [`Storage::open`].
64pub const DEFAULT_BLOB_VERSION: u16 = 0;
65
66/// Errors that can occur when interacting with the runtime.
67#[derive(Error, Debug)]
68pub enum Error {
69    #[error("exited")]
70    Exited,
71    #[error("closed")]
72    Closed,
73    #[error("timeout")]
74    Timeout,
75    #[error("bind failed")]
76    BindFailed,
77    #[error("connection failed")]
78    ConnectionFailed,
79    #[error("write failed")]
80    WriteFailed,
81    #[error("read failed")]
82    ReadFailed,
83    #[error("send failed")]
84    SendFailed,
85    #[error("recv failed")]
86    RecvFailed,
87    #[error("dns resolution failed: {0}")]
88    ResolveFailed(String),
89    #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
90    PartitionNameInvalid(String),
91    #[error("partition creation failed: {0}")]
92    PartitionCreationFailed(String),
93    #[error("partition missing: {0}")]
94    PartitionMissing(String),
95    #[error("partition corrupt: {0}")]
96    PartitionCorrupt(String),
97    #[error("blob open failed: {0}/{1} error: {2}")]
98    BlobOpenFailed(String, String, IoError),
99    #[error("blob missing: {0}/{1}")]
100    BlobMissing(String, String),
101    #[error("blob resize failed: {0}/{1} error: {2}")]
102    BlobResizeFailed(String, String, IoError),
103    #[error("blob sync failed: {0}/{1} error: {2}")]
104    BlobSyncFailed(String, String, IoError),
105    #[error("blob insufficient length")]
106    BlobInsufficientLength,
107    #[error("blob corrupt: {0}/{1} reason: {2}")]
108    BlobCorrupt(String, String, String),
109    #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
110    BlobVersionMismatch {
111        expected: std::ops::RangeInclusive<u16>,
112        found: u16,
113    },
114    #[error("invalid or missing checksum")]
115    InvalidChecksum,
116    #[error("offset overflow")]
117    OffsetOverflow,
118    #[error("immutable blob")]
119    ImmutableBlob,
120    #[error("io error: {0}")]
121    Io(#[from] IoError),
122}
123
124/// Interface that any task scheduler must implement to start
125/// running tasks.
126pub trait Runner {
127    /// Context defines the environment available to tasks.
128    /// Example of possible services provided by the context include:
129    /// - [Clock] for time-based operations
130    /// - [Network] for network operations
131    /// - [Storage] for storage operations
132    type Context;
133
134    /// Start running a root task.
135    ///
136    /// When this function returns, all spawned tasks will be canceled. If clean
137    /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
138    /// [Spawner::stopped] to coordinate clean shutdown.
139    fn start<F, Fut>(self, f: F) -> Fut::Output
140    where
141        F: FnOnce(Self::Context) -> Fut,
142        Fut: Future;
143}
144
145/// Interface that any task scheduler must implement to spawn tasks.
146pub trait Spawner: Clone + Send + Sync + 'static {
147    /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
148    ///
149    /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
150    /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
151    /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
152    /// [`Spawner::dedicated`] instead.
153    ///
154    /// The shared executor with `blocking == false` is the default spawn mode.
155    fn shared(self, blocking: bool) -> Self;
156
157    /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
158    ///
159    /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
160    /// shared executor.
161    ///
162    /// This is not the default behavior. See [`Spawner::shared`] for more information.
163    fn dedicated(self) -> Self;
164
165    /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
166    fn instrumented(self) -> Self;
167
168    /// Spawn a task with the current context.
169    ///
170    /// Unlike directly awaiting a future, the task starts running immediately even if the caller
171    /// never awaits the returned [`Handle`].
172    ///
173    /// # Mandatory Supervision
174    ///
175    /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
176    ///
177    /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
178    /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
179    ///
180    /// ```txt
181    /// ctx_a
182    ///   |
183    ///   +-- clone() ---> ctx_c
184    ///   |                  |
185    ///   |                  +-- spawn() ---> Task C (ctx_d)
186    ///   |
187    ///   +-- spawn() ---> Task A (ctx_b)
188    ///                              |
189    ///                              +-- spawn() ---> Task B (ctx_e)
190    ///
191    /// Task A finishes or aborts --> Task B and Task C are aborted
192    /// ```
193    ///
194    /// # Spawn Configuration
195    ///
196    /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
197    /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
198    ///
199    /// Child tasks should assume they start from a clean configuration without needing to inspect how their
200    /// parent was configured.
201    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
202    where
203        F: FnOnce(Self) -> Fut + Send + 'static,
204        Fut: Future<Output = T> + Send + 'static,
205        T: Send + 'static;
206
207    /// Signals the runtime to stop execution and waits for all outstanding tasks
208    /// to perform any required cleanup and exit.
209    ///
210    /// This method does not actually kill any tasks but rather signals to them, using
211    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
212    /// It then waits for all [signal::Signal] references to be dropped before returning.
213    ///
214    /// ## Multiple Stop Calls
215    ///
216    /// This method is idempotent and safe to call multiple times concurrently (on
217    /// different instances of the same context since it consumes `self`). The first
218    /// call initiates shutdown with the provided `value`, and all subsequent calls
219    /// will wait for the same completion regardless of their `value` parameter, i.e.
220    /// the original `value` from the first call is preserved.
221    ///
222    /// ## Timeout
223    ///
224    /// If a timeout is provided, the method will return an error if all [signal::Signal]
225    /// references have not been dropped within the specified duration.
226    fn stop(
227        self,
228        value: i32,
229        timeout: Option<Duration>,
230    ) -> impl Future<Output = Result<(), Error>> + Send;
231
232    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
233    /// any task.
234    ///
235    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
236    /// immediately. The [signal::Signal] returned will always resolve to the value of the
237    /// first [Spawner::stop] call.
238    fn stopped(&self) -> signal::Signal;
239}
240
241/// Trait for creating [rayon]-compatible thread pools with each worker thread
242/// placed on dedicated threads via [Spawner].
243pub trait RayonPoolSpawner: Spawner + Metrics {
244    /// Creates a clone-able [rayon]-compatible thread pool with [Spawner::spawn].
245    ///
246    /// # Arguments
247    /// - `concurrency`: The number of tasks to execute concurrently in the pool.
248    ///
249    /// # Returns
250    /// A `Result` containing the configured [rayon::ThreadPool] or a [rayon::ThreadPoolBuildError] if the pool cannot
251    /// be built.
252    fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError>;
253
254    /// Creates a clone-able [Rayon] strategy for use with [commonware_parallel].
255    ///
256    /// # Arguments
257    /// - `concurrency`: The number of tasks to execute concurrently in the pool.
258    ///
259    /// # Returns
260    /// A `Result` containing the configured [Rayon] strategy or a [rayon::ThreadPoolBuildError] if the pool cannot be
261    /// built.
262    fn create_strategy(&self, concurrency: NonZeroUsize) -> Result<Rayon, ThreadPoolBuildError> {
263        self.create_pool(concurrency).map(Rayon::with_pool)
264    }
265}
266
267/// Interface to register and encode metrics.
268pub trait Metrics: Clone + Send + Sync + 'static {
269    /// Get the current label of the context.
270    fn label(&self) -> String;
271
272    /// Create a new instance of `Metrics` with the given label appended to the end
273    /// of the current `Metrics` label.
274    ///
275    /// This is commonly used to create a nested context for `register`.
276    ///
277    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
278    /// label (reserved for metrics for the runtime).
279    fn with_label(&self, label: &str) -> Self;
280
281    /// Prefix the given label with the current context's label.
282    ///
283    /// Unlike `with_label`, this method does not create a new context.
284    fn scoped_label(&self, label: &str) -> String {
285        let label = if self.label().is_empty() {
286            label.to_string()
287        } else {
288            format!("{}_{}", self.label(), label)
289        };
290        assert!(
291            !label.starts_with(METRICS_PREFIX),
292            "using runtime label is not allowed"
293        );
294        label
295    }
296
297    /// Register a metric with the runtime.
298    ///
299    /// Any registered metric will include (as a prefix) the label of the current context.
300    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
301
302    /// Encode all metrics into a buffer.
303    fn encode(&self) -> String;
304}
305
306/// Re-export of [governor::Quota] for rate limiting configuration.
307pub use governor::Quota;
308
309/// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
310///
311/// This is a convenience type alias for creating single-entity rate limiters.
312/// For per-key rate limiting, use [KeyedRateLimiter].
313pub type RateLimiter<C> = governor::RateLimiter<
314    governor::state::NotKeyed,
315    governor::state::InMemoryState,
316    C,
317    governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
318>;
319
320/// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
321///
322/// This is a convenience type alias for creating per-peer rate limiters
323/// using governor's [HashMapStateStore].
324///
325/// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
326pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
327    K,
328    governor::state::keyed::HashMapStateStore<K>,
329    C,
330    governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
331>;
332
333/// Interface that any task scheduler must implement to provide
334/// time-based operations.
335///
336/// It is necessary to mock time to provide deterministic execution
337/// of arbitrary tasks.
338pub trait Clock:
339    governor::clock::Clock<Instant = SystemTime>
340    + governor::clock::ReasonablyRealtime
341    + Clone
342    + Send
343    + Sync
344    + 'static
345{
346    /// Returns the current time.
347    fn current(&self) -> SystemTime;
348
349    /// Sleep for the given duration.
350    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
351
352    /// Sleep until the given deadline.
353    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
354
355    /// Await a future with a timeout, returning `Error::Timeout` if it expires.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// use std::time::Duration;
361    /// use commonware_runtime::{deterministic, Error, Runner, Clock};
362    ///
363    /// let executor = deterministic::Runner::default();
364    /// executor.start(|context| async move {
365    ///     match context
366    ///         .timeout(Duration::from_millis(100), async { 42 })
367    ///         .await
368    ///     {
369    ///         Ok(value) => assert_eq!(value, 42),
370    ///         Err(Error::Timeout) => panic!("should not timeout"),
371    ///         Err(e) => panic!("unexpected error: {:?}", e),
372    ///     }
373    /// });
374    /// ```
375    fn timeout<F, T>(
376        &self,
377        duration: Duration,
378        future: F,
379    ) -> impl Future<Output = Result<T, Error>> + Send + '_
380    where
381        F: Future<Output = T> + Send + 'static,
382        T: Send + 'static,
383    {
384        async move {
385            select! {
386                result = future => {
387                    Ok(result)
388                },
389                _ = self.sleep(duration) => {
390                    Err(Error::Timeout)
391                },
392            }
393        }
394    }
395}
396
397cfg_if::cfg_if! {
398    if #[cfg(feature = "external")] {
399        /// Interface that runtimes can implement to constrain the execution latency of a future.
400        pub trait Pacer: Clock + Clone + Send + Sync + 'static {
401            /// Defer completion of a future until a specified `latency` has elapsed. If the future is
402            /// not yet ready at the desired time of completion, the runtime will block until the future
403            /// is ready.
404            ///
405            /// In [crate::deterministic], this is used to ensure interactions with external systems can
406            /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
407            /// multiple runtimes to be tested with no code changes).
408            ///
409            /// # Setting Latency
410            ///
411            /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
412            /// the expected resolution latency of the future. To better explore the possible behavior of an
413            /// application, users can set latency to a randomly chosen value in the range of
414            /// `[expected latency / 2, expected latency * 2]`.
415            ///
416            /// # Warning
417            ///
418            /// Because `pace` blocks if the future is not ready, it is important that the future's completion
419            /// doesn't require anything in the current thread to complete (or else it will deadlock).
420            fn pace<'a, F, T>(
421                &'a self,
422                latency: Duration,
423                future: F,
424            ) -> impl Future<Output = T> + Send + 'a
425            where
426                F: Future<Output = T> + Send + 'a,
427                T: Send + 'a;
428        }
429
430        /// Extension trait that makes it more ergonomic to use [Pacer].
431        ///
432        /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
433        /// runtime should delay completion relative to the clock.
434        pub trait FutureExt: Future + Send + Sized {
435            /// Delay completion of the future until a specified `latency` on `pacer`.
436            fn pace<'a, E>(
437                self,
438                pacer: &'a E,
439                latency: Duration,
440            ) -> impl Future<Output = Self::Output> + Send + 'a
441            where
442                E: Pacer + 'a,
443                Self: Send + 'a,
444                Self::Output: Send + 'a,
445            {
446                pacer.pace(latency, self)
447            }
448        }
449
450        impl<F> FutureExt for F where F: Future + Send {}
451    }
452}
453
454/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
455pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
456
457/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
458pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
459
460/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
461pub type ListenerOf<N> = <N as crate::Network>::Listener;
462
463/// Interface that any runtime must implement to create
464/// network connections.
465pub trait Network: Clone + Send + Sync + 'static {
466    /// The type of [Listener] that's returned when binding to a socket.
467    /// Accepting a connection returns a [Sink] and [Stream] which are defined
468    /// by the [Listener] and used to send and receive data over the connection.
469    type Listener: Listener;
470
471    /// Bind to the given socket address.
472    fn bind(
473        &self,
474        socket: SocketAddr,
475    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
476
477    /// Dial the given socket address.
478    fn dial(
479        &self,
480        socket: SocketAddr,
481    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
482}
483
484/// Interface for DNS resolution.
485pub trait Resolver: Clone + Send + Sync + 'static {
486    /// Resolve a hostname to IP addresses.
487    ///
488    /// Returns a list of IP addresses that the hostname resolves to.
489    fn resolve(
490        &self,
491        host: &str,
492    ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
493}
494
495/// Interface that any runtime must implement to handle
496/// incoming network connections.
497pub trait Listener: Sync + Send + 'static {
498    /// The type of [Sink] that's returned when accepting a connection.
499    /// This is used to send data to the remote connection.
500    type Sink: Sink;
501    /// The type of [Stream] that's returned when accepting a connection.
502    /// This is used to receive data from the remote connection.
503    type Stream: Stream;
504
505    /// Accept an incoming connection.
506    fn accept(
507        &mut self,
508    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
509
510    /// Returns the local address of the listener.
511    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
512}
513
514/// Interface that any runtime must implement to send
515/// messages over a network connection.
516pub trait Sink: Sync + Send + 'static {
517    /// Send a message to the sink.
518    ///
519    /// # Warning
520    ///
521    /// If the sink returns an error, part of the message may still be delivered.
522    fn send(&mut self, msg: impl Buf + Send) -> impl Future<Output = Result<(), Error>> + Send;
523}
524
525/// Interface that any runtime must implement to receive
526/// messages over a network connection.
527pub trait Stream: Sync + Send + 'static {
528    /// Receive a message from the stream, storing it in the given buffer.
529    /// Reads exactly the number of bytes that fit in the buffer.
530    ///
531    /// # Warning
532    ///
533    /// If the stream returns an error, partially read data may be discarded.
534    fn recv(&mut self, buf: impl BufMut + Send) -> impl Future<Output = Result<(), Error>> + Send;
535}
536
537/// Interface to interact with storage.
538///
539/// To support storage implementations that enable concurrent reads and
540/// writes, blobs are responsible for maintaining synchronization.
541///
542/// Storage can be backed by a local filesystem, cloud storage, etc.
543///
544/// # Partition Names
545///
546/// Partition names must be non-empty and contain only ASCII alphanumeric
547/// characters, dashes (`-`), or underscores (`_`). Names containing other
548/// characters (e.g., `/`, `.`, spaces) will return an error.
549pub trait Storage: Clone + Send + Sync + 'static {
550    /// The readable/writeable storage buffer that can be opened by this Storage.
551    type Blob: Blob;
552
553    /// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
554    /// in the versions range. The blob version is omitted from the return value.
555    fn open(
556        &self,
557        partition: &str,
558        name: &[u8],
559    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
560        async move {
561            let (blob, size, _) = self
562                .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
563                .await?;
564            Ok((blob, size))
565        }
566    }
567
568    /// Open an existing blob in a given partition or create a new one, returning
569    /// the blob and its length.
570    ///
571    /// Multiple instances of the same blob can be opened concurrently, however,
572    /// writing to the same blob concurrently may lead to undefined behavior.
573    ///
574    /// An Ok result indicates the blob is durably created (or already exists).
575    ///
576    /// # Versions
577    ///
578    /// Blobs are versioned. If the blob's version is not in `versions`, returns
579    /// [Error::BlobVersionMismatch].
580    ///
581    /// # Returns
582    ///
583    /// A tuple of (blob, logical_size, blob_version).
584    fn open_versioned(
585        &self,
586        partition: &str,
587        name: &[u8],
588        versions: std::ops::RangeInclusive<u16>,
589    ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
590
591    /// Remove a blob from a given partition.
592    ///
593    /// If no `name` is provided, the entire partition is removed.
594    ///
595    /// An Ok result indicates the blob is durably removed.
596    fn remove(
597        &self,
598        partition: &str,
599        name: Option<&[u8]>,
600    ) -> impl Future<Output = Result<(), Error>> + Send;
601
602    /// Return all blobs in a given partition.
603    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
604}
605
606/// Interface to read and write to a blob.
607///
608/// To support blob implementations that enable concurrent reads and
609/// writes, blobs are responsible for maintaining synchronization.
610///
611/// Cloning a blob is similar to wrapping a single file descriptor in
612/// a lock whereas opening a new blob (of the same name) is similar to
613/// opening a new file descriptor. If multiple blobs are opened with the same
614/// name, they are not expected to coordinate access to underlying storage
615/// and writing to both is undefined behavior.
616///
617/// When a blob is dropped, any unsynced changes may be discarded. Implementations
618/// may attempt to sync during drop but errors will go unhandled. Call `sync`
619/// before dropping to ensure all changes are durably persisted.
620#[allow(clippy::len_without_is_empty)]
621pub trait Blob: Clone + Send + Sync + 'static {
622    /// Read from the blob at the given offset.
623    ///
624    /// `read_at` does not return the number of bytes read because it
625    /// only returns once the entire buffer has been filled.
626    fn read_at(
627        &self,
628        buf: impl Into<StableBuf> + Send,
629        offset: u64,
630    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
631
632    /// Write `buf` to the blob at the given offset.
633    fn write_at(
634        &self,
635        buf: impl Into<StableBuf> + Send,
636        offset: u64,
637    ) -> impl Future<Output = Result<(), Error>> + Send;
638
639    /// Resize the blob to the given length.
640    ///
641    /// If the length is greater than the current length, the blob is extended with zeros.
642    /// If the length is less than the current length, the blob is resized.
643    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
644
645    /// Ensure all pending data is durably persisted.
646    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652    use crate::telemetry::traces::collector::TraceStorage;
653    use bytes::Bytes;
654    use commonware_macros::{select, test_collect_traces};
655    use commonware_utils::NZUsize;
656    use futures::{
657        channel::{mpsc, oneshot},
658        future::{pending, ready},
659        join, pin_mut, FutureExt, SinkExt, StreamExt,
660    };
661    use prometheus_client::metrics::counter::Counter;
662    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
663    use std::{
664        collections::HashMap,
665        net::{IpAddr, Ipv4Addr, Ipv6Addr},
666        pin::Pin,
667        str::FromStr,
668        sync::{
669            atomic::{AtomicU32, Ordering},
670            Arc, Mutex,
671        },
672        task::{Context as TContext, Poll, Waker},
673    };
674    use tracing::{error, Level};
675    use utils::reschedule;
676
677    fn test_error_future<R: Runner>(runner: R) {
678        async fn error_future() -> Result<&'static str, &'static str> {
679            Err("An error occurred")
680        }
681        let result = runner.start(|_| error_future());
682        assert_eq!(result, Err("An error occurred"));
683    }
684
685    fn test_clock_sleep<R: Runner>(runner: R)
686    where
687        R::Context: Spawner + Clock,
688    {
689        runner.start(|context| async move {
690            // Capture initial time
691            let start = context.current();
692            let sleep_duration = Duration::from_millis(10);
693            context.sleep(sleep_duration).await;
694
695            // After run, time should have advanced
696            let end = context.current();
697            assert!(end.duration_since(start).unwrap() >= sleep_duration);
698        });
699    }
700
701    fn test_clock_sleep_until<R: Runner>(runner: R)
702    where
703        R::Context: Spawner + Clock + Metrics,
704    {
705        runner.start(|context| async move {
706            // Trigger sleep
707            let now = context.current();
708            context.sleep_until(now + Duration::from_millis(100)).await;
709
710            // Ensure slept duration has elapsed
711            let elapsed = now.elapsed().unwrap();
712            assert!(elapsed >= Duration::from_millis(100));
713        });
714    }
715
716    fn test_clock_timeout<R: Runner>(runner: R)
717    where
718        R::Context: Spawner + Clock,
719    {
720        runner.start(|context| async move {
721            // Future completes before timeout
722            let result = context
723                .timeout(Duration::from_millis(100), async { "success" })
724                .await;
725            assert_eq!(result.unwrap(), "success");
726
727            // Future exceeds timeout duration
728            let result = context
729                .timeout(Duration::from_millis(50), pending::<()>())
730                .await;
731            assert!(matches!(result, Err(Error::Timeout)));
732
733            // Future completes within timeout
734            let result = context
735                .timeout(
736                    Duration::from_millis(100),
737                    context.sleep(Duration::from_millis(50)),
738                )
739                .await;
740            assert!(result.is_ok());
741        });
742    }
743
744    fn test_root_finishes<R: Runner>(runner: R)
745    where
746        R::Context: Spawner,
747    {
748        runner.start(|context| async move {
749            context.spawn(|_| async move {
750                loop {
751                    reschedule().await;
752                }
753            });
754        });
755    }
756
757    fn test_spawn_after_abort<R>(runner: R)
758    where
759        R: Runner,
760        R::Context: Spawner + Clone,
761    {
762        runner.start(|context| async move {
763            // Create a child context
764            let child = context.clone();
765
766            // Spawn parent and abort
767            let parent_handle = context.spawn(move |_| async move {
768                pending::<()>().await;
769            });
770            parent_handle.abort();
771
772            // Spawn child and ensure it aborts
773            let child_handle = child.spawn(move |_| async move {
774                pending::<()>().await;
775            });
776            assert!(matches!(child_handle.await, Err(Error::Closed)));
777        });
778    }
779
780    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
781    where
782        R::Context: Spawner,
783    {
784        runner.start(|context| async move {
785            let context = if dedicated {
786                assert!(!blocking);
787                context.dedicated()
788            } else {
789                context.shared(blocking)
790            };
791
792            let handle = context.spawn(|_| async move {
793                loop {
794                    reschedule().await;
795                }
796            });
797            handle.abort();
798            assert!(matches!(handle.await, Err(Error::Closed)));
799        });
800    }
801
802    fn test_panic_aborts_root<R: Runner>(runner: R) {
803        let result: Result<(), Error> = runner.start(|_| async move {
804            panic!("blah");
805        });
806        result.unwrap_err();
807    }
808
809    fn test_panic_aborts_spawn<R: Runner>(runner: R)
810    where
811        R::Context: Spawner + Clock,
812    {
813        runner.start(|context| async move {
814            context.clone().spawn(|_| async move {
815                panic!("blah");
816            });
817
818            // Loop until panic
819            loop {
820                context.sleep(Duration::from_millis(100)).await;
821            }
822        });
823    }
824
825    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
826    where
827        R::Context: Spawner + Clock,
828    {
829        let result: Result<(), Error> = runner.start(|context| async move {
830            let result = context.clone().spawn(|_| async move {
831                panic!("blah");
832            });
833            result.await
834        });
835        assert!(matches!(result, Err(Error::Exited)));
836    }
837
838    fn test_multiple_panics<R: Runner>(runner: R)
839    where
840        R::Context: Spawner + Clock,
841    {
842        runner.start(|context| async move {
843            context.clone().spawn(|_| async move {
844                panic!("boom 1");
845            });
846            context.clone().spawn(|_| async move {
847                panic!("boom 2");
848            });
849            context.clone().spawn(|_| async move {
850                panic!("boom 3");
851            });
852
853            // Loop until panic
854            loop {
855                context.sleep(Duration::from_millis(100)).await;
856            }
857        });
858    }
859
860    fn test_multiple_panics_caught<R: Runner>(runner: R)
861    where
862        R::Context: Spawner + Clock,
863    {
864        let (res1, res2, res3) = runner.start(|context| async move {
865            let handle1 = context.clone().spawn(|_| async move {
866                panic!("boom 1");
867            });
868            let handle2 = context.clone().spawn(|_| async move {
869                panic!("boom 2");
870            });
871            let handle3 = context.clone().spawn(|_| async move {
872                panic!("boom 3");
873            });
874
875            join!(handle1, handle2, handle3)
876        });
877        assert!(matches!(res1, Err(Error::Exited)));
878        assert!(matches!(res2, Err(Error::Exited)));
879        assert!(matches!(res3, Err(Error::Exited)));
880    }
881
882    fn test_select<R: Runner>(runner: R) {
883        runner.start(|_| async move {
884            // Test first branch
885            let output = Mutex::new(0);
886            select! {
887                v1 = ready(1) => {
888                    *output.lock().unwrap() = v1;
889                },
890                v2 = ready(2) => {
891                    *output.lock().unwrap() = v2;
892                },
893            };
894            assert_eq!(*output.lock().unwrap(), 1);
895
896            // Test second branch
897            select! {
898                v1 = std::future::pending::<i32>() => {
899                    *output.lock().unwrap() = v1;
900                },
901                v2 = ready(2) => {
902                    *output.lock().unwrap() = v2;
903                },
904            };
905            assert_eq!(*output.lock().unwrap(), 2);
906        });
907    }
908
909    /// Ensure future fusing works as expected.
910    fn test_select_loop<R: Runner>(runner: R)
911    where
912        R::Context: Clock,
913    {
914        runner.start(|context| async move {
915            // Should hit timeout
916            let (mut sender, mut receiver) = mpsc::unbounded();
917            for _ in 0..2 {
918                select! {
919                    v = receiver.next() => {
920                        panic!("unexpected value: {v:?}");
921                    },
922                    _ = context.sleep(Duration::from_millis(100)) => {
923                        continue;
924                    },
925                };
926            }
927
928            // Populate channel
929            sender.send(0).await.unwrap();
930            sender.send(1).await.unwrap();
931
932            // Prefer not reading channel without losing messages
933            select! {
934                _ = async {} => {
935                    // Skip reading from channel even though populated
936                },
937                v = receiver.next() => {
938                    panic!("unexpected value: {v:?}");
939                },
940            };
941
942            // Process messages
943            for i in 0..2 {
944                select! {
945                    _ = context.sleep(Duration::from_millis(100)) => {
946                        panic!("timeout");
947                    },
948                    v = receiver.next() => {
949                        assert_eq!(v.unwrap(), i);
950                    },
951                };
952            }
953        });
954    }
955
956    fn test_storage_operations<R: Runner>(runner: R)
957    where
958        R::Context: Storage,
959    {
960        runner.start(|context| async move {
961            let partition = "test_partition";
962            let name = b"test_blob";
963
964            // Open a new blob
965            let (blob, size) = context
966                .open(partition, name)
967                .await
968                .expect("Failed to open blob");
969            assert_eq!(size, 0, "new blob should have size 0");
970
971            // Write data to the blob
972            let data = b"Hello, Storage!";
973            blob.write_at(Vec::from(data), 0)
974                .await
975                .expect("Failed to write to blob");
976
977            // Sync the blob
978            blob.sync().await.expect("Failed to sync blob");
979
980            // Read data from the blob
981            let read = blob
982                .read_at(vec![0; data.len()], 0)
983                .await
984                .expect("Failed to read from blob");
985            assert_eq!(read.as_ref(), data);
986
987            // Sync the blob
988            blob.sync().await.expect("Failed to sync blob");
989
990            // Scan blobs in the partition
991            let blobs = context
992                .scan(partition)
993                .await
994                .expect("Failed to scan partition");
995            assert!(blobs.contains(&name.to_vec()));
996
997            // Reopen the blob
998            let (blob, len) = context
999                .open(partition, name)
1000                .await
1001                .expect("Failed to reopen blob");
1002            assert_eq!(len, data.len() as u64);
1003
1004            // Read data part of message back
1005            let read = blob
1006                .read_at(vec![0u8; 7], 7)
1007                .await
1008                .expect("Failed to read data");
1009            assert_eq!(read.as_ref(), b"Storage");
1010
1011            // Sync the blob
1012            blob.sync().await.expect("Failed to sync blob");
1013
1014            // Remove the blob
1015            context
1016                .remove(partition, Some(name))
1017                .await
1018                .expect("Failed to remove blob");
1019
1020            // Ensure the blob is removed
1021            let blobs = context
1022                .scan(partition)
1023                .await
1024                .expect("Failed to scan partition");
1025            assert!(!blobs.contains(&name.to_vec()));
1026
1027            // Remove the partition
1028            context
1029                .remove(partition, None)
1030                .await
1031                .expect("Failed to remove partition");
1032
1033            // Scan the partition
1034            let result = context.scan(partition).await;
1035            assert!(matches!(result, Err(Error::PartitionMissing(_))));
1036        });
1037    }
1038
1039    fn test_blob_read_write<R: Runner>(runner: R)
1040    where
1041        R::Context: Storage,
1042    {
1043        runner.start(|context| async move {
1044            let partition = "test_partition";
1045            let name = b"test_blob_rw";
1046
1047            // Open a new blob
1048            let (blob, _) = context
1049                .open(partition, name)
1050                .await
1051                .expect("Failed to open blob");
1052
1053            // Write data at different offsets
1054            let data1 = b"Hello";
1055            let data2 = b"World";
1056            blob.write_at(Vec::from(data1), 0)
1057                .await
1058                .expect("Failed to write data1");
1059            blob.write_at(Vec::from(data2), 5)
1060                .await
1061                .expect("Failed to write data2");
1062
1063            // Read data back
1064            let read = blob
1065                .read_at(vec![0u8; 10], 0)
1066                .await
1067                .expect("Failed to read data");
1068            assert_eq!(&read.as_ref()[..5], data1);
1069            assert_eq!(&read.as_ref()[5..], data2);
1070
1071            // Read past end of blob
1072            let result = blob.read_at(vec![0u8; 10], 10).await;
1073            assert!(result.is_err());
1074
1075            // Rewrite data without affecting length
1076            let data3 = b"Store";
1077            blob.write_at(Vec::from(data3), 5)
1078                .await
1079                .expect("Failed to write data3");
1080
1081            // Read data back
1082            let read = blob
1083                .read_at(vec![0u8; 10], 0)
1084                .await
1085                .expect("Failed to read data");
1086            assert_eq!(&read.as_ref()[..5], data1);
1087            assert_eq!(&read.as_ref()[5..], data3);
1088
1089            // Read past end of blob
1090            let result = blob.read_at(vec![0u8; 10], 10).await;
1091            assert!(result.is_err());
1092        });
1093    }
1094
1095    fn test_blob_resize<R: Runner>(runner: R)
1096    where
1097        R::Context: Storage,
1098    {
1099        runner.start(|context| async move {
1100            let partition = "test_partition_resize";
1101            let name = b"test_blob_resize";
1102
1103            // Open and write to a new blob
1104            let (blob, _) = context
1105                .open(partition, name)
1106                .await
1107                .expect("Failed to open blob");
1108
1109            let data = b"some data";
1110            blob.write_at(data.to_vec(), 0)
1111                .await
1112                .expect("Failed to write");
1113            blob.sync().await.expect("Failed to sync after write");
1114
1115            // Re-open and check length
1116            let (blob, len) = context.open(partition, name).await.unwrap();
1117            assert_eq!(len, data.len() as u64);
1118
1119            // Resize to extend the file
1120            let new_len = (data.len() as u64) * 2;
1121            blob.resize(new_len)
1122                .await
1123                .expect("Failed to resize to extend");
1124            blob.sync().await.expect("Failed to sync after resize");
1125
1126            // Re-open and check length again
1127            let (blob, len) = context.open(partition, name).await.unwrap();
1128            assert_eq!(len, new_len);
1129
1130            // Read original data
1131            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1132            assert_eq!(read_buf.as_ref(), data);
1133
1134            // Read extended part (should be zeros)
1135            let extended_part = blob
1136                .read_at(vec![0; data.len()], data.len() as u64)
1137                .await
1138                .unwrap();
1139            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1140
1141            // Truncate the blob
1142            blob.resize(data.len() as u64).await.unwrap();
1143            blob.sync().await.unwrap();
1144
1145            // Reopen to check truncation
1146            let (blob, size) = context.open(partition, name).await.unwrap();
1147            assert_eq!(size, data.len() as u64);
1148
1149            // Read truncated data
1150            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1151            assert_eq!(read_buf.as_ref(), data);
1152            blob.sync().await.unwrap();
1153        });
1154    }
1155
1156    fn test_many_partition_read_write<R: Runner>(runner: R)
1157    where
1158        R::Context: Storage,
1159    {
1160        runner.start(|context| async move {
1161            let partitions = ["partition1", "partition2", "partition3"];
1162            let name = b"test_blob_rw";
1163            let data1 = b"Hello";
1164            let data2 = b"World";
1165
1166            for (additional, partition) in partitions.iter().enumerate() {
1167                // Open a new blob
1168                let (blob, _) = context
1169                    .open(partition, name)
1170                    .await
1171                    .expect("Failed to open blob");
1172
1173                // Write data at different offsets
1174                blob.write_at(Vec::from(data1), 0)
1175                    .await
1176                    .expect("Failed to write data1");
1177                blob.write_at(Vec::from(data2), 5 + additional as u64)
1178                    .await
1179                    .expect("Failed to write data2");
1180
1181                // Sync the blob
1182                blob.sync().await.expect("Failed to sync blob");
1183            }
1184
1185            for (additional, partition) in partitions.iter().enumerate() {
1186                // Open a new blob
1187                let (blob, len) = context
1188                    .open(partition, name)
1189                    .await
1190                    .expect("Failed to open blob");
1191                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1192
1193                // Read data back
1194                let read = blob
1195                    .read_at(vec![0u8; 10 + additional], 0)
1196                    .await
1197                    .expect("Failed to read data");
1198                assert_eq!(&read.as_ref()[..5], b"Hello");
1199                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1200            }
1201        });
1202    }
1203
1204    fn test_blob_read_past_length<R: Runner>(runner: R)
1205    where
1206        R::Context: Storage,
1207    {
1208        runner.start(|context| async move {
1209            let partition = "test_partition";
1210            let name = b"test_blob_rw";
1211
1212            // Open a new blob
1213            let (blob, _) = context
1214                .open(partition, name)
1215                .await
1216                .expect("Failed to open blob");
1217
1218            // Read data past file length (empty file)
1219            let result = blob.read_at(vec![0u8; 10], 0).await;
1220            assert!(result.is_err());
1221
1222            // Write data to the blob
1223            let data = b"Hello, Storage!".to_vec();
1224            blob.write_at(data, 0)
1225                .await
1226                .expect("Failed to write to blob");
1227
1228            // Read data past file length (non-empty file)
1229            let result = blob.read_at(vec![0u8; 20], 0).await;
1230            assert!(result.is_err());
1231        })
1232    }
1233
1234    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1235    where
1236        R::Context: Spawner + Storage + Metrics,
1237    {
1238        runner.start(|context| async move {
1239            let partition = "test_partition";
1240            let name = b"test_blob_rw";
1241
1242            // Open a new blob
1243            let (blob, _) = context
1244                .open(partition, name)
1245                .await
1246                .expect("Failed to open blob");
1247
1248            // Write data to the blob
1249            let data = b"Hello, Storage!";
1250            blob.write_at(Vec::from(data), 0)
1251                .await
1252                .expect("Failed to write to blob");
1253
1254            // Sync the blob
1255            blob.sync().await.expect("Failed to sync blob");
1256
1257            // Read data from the blob in clone
1258            let check1 = context.with_label("check1").spawn({
1259                let blob = blob.clone();
1260                move |_| async move {
1261                    let read = blob
1262                        .read_at(vec![0u8; data.len()], 0)
1263                        .await
1264                        .expect("Failed to read from blob");
1265                    assert_eq!(read.as_ref(), data);
1266                }
1267            });
1268            let check2 = context.with_label("check2").spawn({
1269                let blob = blob.clone();
1270                move |_| async move {
1271                    let read = blob
1272                        .read_at(vec![0; data.len()], 0)
1273                        .await
1274                        .expect("Failed to read from blob");
1275                    assert_eq!(read.as_ref(), data);
1276                }
1277            });
1278
1279            // Wait for both reads to complete
1280            let result = join!(check1, check2);
1281            assert!(result.0.is_ok());
1282            assert!(result.1.is_ok());
1283
1284            // Read data from the blob
1285            let read = blob
1286                .read_at(vec![0; data.len()], 0)
1287                .await
1288                .expect("Failed to read from blob");
1289            assert_eq!(read.as_ref(), data);
1290
1291            // Drop the blob
1292            drop(blob);
1293
1294            // Ensure no blobs still open
1295            let buffer = context.encode();
1296            assert!(buffer.contains("open_blobs 0"));
1297        });
1298    }
1299
1300    fn test_shutdown<R: Runner>(runner: R)
1301    where
1302        R::Context: Spawner + Metrics + Clock,
1303    {
1304        let kill = 9;
1305        runner.start(|context| async move {
1306            // Spawn a task that waits for signal
1307            let before = context
1308                .with_label("before")
1309                .spawn(move |context| async move {
1310                    let mut signal = context.stopped();
1311                    let value = (&mut signal).await.unwrap();
1312                    assert_eq!(value, kill);
1313                    drop(signal);
1314                });
1315
1316            // Signal the tasks and wait for them to stop
1317            let result = context.clone().stop(kill, None).await;
1318            assert!(result.is_ok());
1319
1320            // Spawn a task after stop is called
1321            let after = context
1322                .with_label("after")
1323                .spawn(move |context| async move {
1324                    // A call to `stopped()` after `stop()` resolves immediately
1325                    let value = context.stopped().await.unwrap();
1326                    assert_eq!(value, kill);
1327                });
1328
1329            // Ensure both tasks complete
1330            let result = join!(before, after);
1331            assert!(result.0.is_ok());
1332            assert!(result.1.is_ok());
1333        });
1334    }
1335
1336    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1337    where
1338        R::Context: Spawner + Metrics + Clock,
1339    {
1340        let kill = 42;
1341        runner.start(|context| async move {
1342            let (started_tx, mut started_rx) = mpsc::channel(3);
1343            let counter = Arc::new(AtomicU32::new(0));
1344
1345            // Spawn 3 tasks that do cleanup work after receiving stop signal
1346            // and increment a shared counter
1347            let task = |cleanup_duration: Duration| {
1348                let context = context.clone();
1349                let counter = counter.clone();
1350                let mut started_tx = started_tx.clone();
1351                context.spawn(move |context| async move {
1352                    // Wait for signal to be acquired
1353                    let mut signal = context.stopped();
1354                    started_tx.send(()).await.unwrap();
1355
1356                    // Increment once killed
1357                    let value = (&mut signal).await.unwrap();
1358                    assert_eq!(value, kill);
1359                    context.sleep(cleanup_duration).await;
1360                    counter.fetch_add(1, Ordering::SeqCst);
1361
1362                    // Wait to drop signal until work has been done
1363                    drop(signal);
1364                })
1365            };
1366
1367            let task1 = task(Duration::from_millis(10));
1368            let task2 = task(Duration::from_millis(20));
1369            let task3 = task(Duration::from_millis(30));
1370
1371            // Give tasks time to start
1372            for _ in 0..3 {
1373                started_rx.next().await.unwrap();
1374            }
1375
1376            // Stop and verify all cleanup completed
1377            context.stop(kill, None).await.unwrap();
1378            assert_eq!(counter.load(Ordering::SeqCst), 3);
1379
1380            // Ensure all tasks completed
1381            let result = join!(task1, task2, task3);
1382            assert!(result.0.is_ok());
1383            assert!(result.1.is_ok());
1384            assert!(result.2.is_ok());
1385        });
1386    }
1387
1388    fn test_shutdown_timeout<R: Runner>(runner: R)
1389    where
1390        R::Context: Spawner + Metrics + Clock,
1391    {
1392        let kill = 42;
1393        runner.start(|context| async move {
1394            // Setup startup coordinator
1395            let (started_tx, started_rx) = oneshot::channel();
1396
1397            // Spawn a task that never completes its cleanup
1398            context.clone().spawn(move |context| async move {
1399                let signal = context.stopped();
1400                started_tx.send(()).unwrap();
1401                pending::<()>().await;
1402                signal.await.unwrap();
1403            });
1404
1405            // Try to stop with a timeout
1406            started_rx.await.unwrap();
1407            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1408
1409            // Assert that we got a timeout error
1410            assert!(matches!(result, Err(Error::Timeout)));
1411        });
1412    }
1413
1414    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1415    where
1416        R::Context: Spawner + Metrics + Clock,
1417    {
1418        let kill1 = 42;
1419        let kill2 = 43;
1420
1421        runner.start(|context| async move {
1422            let (started_tx, started_rx) = oneshot::channel();
1423            let counter = Arc::new(AtomicU32::new(0));
1424
1425            // Spawn a task that delays completion to test timing
1426            let task = context.with_label("blocking_task").spawn({
1427                let counter = counter.clone();
1428                move |context| async move {
1429                    // Wait for signal to be acquired
1430                    let mut signal = context.stopped();
1431                    started_tx.send(()).unwrap();
1432
1433                    // Wait for signal to be resolved
1434                    let value = (&mut signal).await.unwrap();
1435                    assert_eq!(value, kill1);
1436                    context.sleep(Duration::from_millis(50)).await;
1437
1438                    // Increment counter
1439                    counter.fetch_add(1, Ordering::SeqCst);
1440                    drop(signal);
1441                }
1442            });
1443
1444            // Give task time to start
1445            started_rx.await.unwrap();
1446
1447            // Issue two separate stop calls
1448            // The second stop call uses a different stop value that should be ignored
1449            let stop_task1 = context.clone().stop(kill1, None);
1450            pin_mut!(stop_task1);
1451            let stop_task2 = context.clone().stop(kill2, None);
1452            pin_mut!(stop_task2);
1453
1454            // Both of them should be awaiting completion
1455            assert!(stop_task1.as_mut().now_or_never().is_none());
1456            assert!(stop_task2.as_mut().now_or_never().is_none());
1457
1458            // Wait for both stop calls to complete
1459            assert!(stop_task1.await.is_ok());
1460            assert!(stop_task2.await.is_ok());
1461
1462            // Verify first stop value wins
1463            let sig = context.stopped().await;
1464            assert_eq!(sig.unwrap(), kill1);
1465
1466            // Wait for blocking task to complete
1467            let result = task.await;
1468            assert!(result.is_ok());
1469            assert_eq!(counter.load(Ordering::SeqCst), 1);
1470
1471            // Post-completion stop should return immediately
1472            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1473        });
1474    }
1475
1476    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1477    where
1478        R::Context: Spawner + Metrics,
1479    {
1480        runner.start(|context| async move {
1481            // Spawn a task that waits for signal
1482            context
1483                .with_label("before")
1484                .spawn(move |context| async move {
1485                    let mut signal = context.stopped();
1486                    let value = (&mut signal).await.unwrap();
1487
1488                    // We should never reach this point
1489                    assert_eq!(value, 42);
1490                    drop(signal);
1491                });
1492
1493            // Ensure waker is registered
1494            reschedule().await;
1495        });
1496    }
1497
1498    fn test_spawn_dedicated<R: Runner>(runner: R)
1499    where
1500        R::Context: Spawner,
1501    {
1502        runner.start(|context| async move {
1503            let handle = context.dedicated().spawn(|_| async move { 42 });
1504            assert!(matches!(handle.await, Ok(42)));
1505        });
1506    }
1507
1508    fn test_spawn<R: Runner>(runner: R)
1509    where
1510        R::Context: Spawner + Clock,
1511    {
1512        runner.start(|context| async move {
1513            let child_handle = Arc::new(Mutex::new(None));
1514            let child_handle2 = child_handle.clone();
1515
1516            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1517            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1518            let parent_handle = context.spawn(move |context| async move {
1519                // Spawn child that completes immediately
1520                let handle = context.spawn(|_| async {});
1521
1522                // Store child handle so we can test it later
1523                *child_handle2.lock().unwrap() = Some(handle);
1524
1525                parent_initialized_tx.send(()).unwrap();
1526
1527                // Parent task completes
1528                parent_complete_rx.await.unwrap();
1529            });
1530
1531            // Wait for parent task to spawn the children
1532            parent_initialized_rx.await.unwrap();
1533
1534            // Child task completes successfully
1535            let child_handle = child_handle.lock().unwrap().take().unwrap();
1536            assert!(child_handle.await.is_ok());
1537
1538            // Complete the parent task
1539            parent_complete_tx.send(()).unwrap();
1540
1541            // Wait for parent task to complete successfully
1542            assert!(parent_handle.await.is_ok());
1543        });
1544    }
1545
1546    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1547    where
1548        R::Context: Spawner + Clock,
1549    {
1550        runner.start(|context| async move {
1551            let child_handle = Arc::new(Mutex::new(None));
1552            let child_handle2 = child_handle.clone();
1553
1554            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1555            let parent_handle = context.spawn(move |context| async move {
1556                // Spawn child task that hangs forever, should be aborted when parent aborts
1557                let handle = context.spawn(|_| pending::<()>());
1558
1559                // Store child task handle so we can test it later
1560                *child_handle2.lock().unwrap() = Some(handle);
1561
1562                parent_initialized_tx.send(()).unwrap();
1563
1564                // Parent task runs until aborted
1565                pending::<()>().await
1566            });
1567
1568            // Wait for parent task to spawn the children
1569            parent_initialized_rx.await.unwrap();
1570
1571            // Abort parent task
1572            parent_handle.abort();
1573            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1574
1575            // Child task should also resolve with error since its parent aborted
1576            let child_handle = child_handle.lock().unwrap().take().unwrap();
1577            assert!(matches!(child_handle.await, Err(Error::Closed)));
1578        });
1579    }
1580
1581    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1582    where
1583        R::Context: Spawner + Clock,
1584    {
1585        runner.start(|context| async move {
1586            let child_handle = Arc::new(Mutex::new(None));
1587            let child_handle2 = child_handle.clone();
1588
1589            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1590            let parent_handle = context.spawn(move |context| async move {
1591                // Spawn child task that hangs forever, should be aborted when parent completes
1592                let handle = context.spawn(|_| pending::<()>());
1593
1594                // Store child task handle so we can test it later
1595                *child_handle2.lock().unwrap() = Some(handle);
1596
1597                // Parent task completes
1598                parent_complete_rx.await.unwrap();
1599            });
1600
1601            // Fire parent completion
1602            parent_complete_tx.send(()).unwrap();
1603
1604            // Wait for parent task to complete
1605            assert!(parent_handle.await.is_ok());
1606
1607            // Child task should resolve with error since its parent has completed
1608            let child_handle = child_handle.lock().unwrap().take().unwrap();
1609            assert!(matches!(child_handle.await, Err(Error::Closed)));
1610        });
1611    }
1612
1613    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1614    where
1615        R::Context: Spawner + Clock,
1616    {
1617        runner.start(|context| async move {
1618            // We create the following tree of tasks. All tasks will run
1619            // indefinitely (until aborted).
1620            //
1621            //          root
1622            //     /     |     \
1623            //    /      |      \
1624            //   c0      c1      c2
1625            //  /  \    /  \    /  \
1626            // g0  g1  g2  g3  g4  g5
1627            let c0 = context.clone();
1628            let g0 = c0.clone();
1629            let g1 = c0.clone();
1630            let c1 = context.clone();
1631            let g2 = c1.clone();
1632            let g3 = c1.clone();
1633            let c2 = context.clone();
1634            let g4 = c2.clone();
1635            let g5 = c2.clone();
1636
1637            // Spawn tasks
1638            let handles = Arc::new(Mutex::new(Vec::new()));
1639            let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1640            let root_task = context.spawn({
1641                let handles = handles.clone();
1642                move |_| async move {
1643                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1644                    {
1645                        let handle = context.spawn({
1646                            let handles = handles.clone();
1647                            let mut initialized_tx = initialized_tx.clone();
1648                            move |_| async move {
1649                                for grandchild in grandchildren {
1650                                    let handle = grandchild.spawn(|_| async {
1651                                        pending::<()>().await;
1652                                    });
1653                                    handles.lock().unwrap().push(handle);
1654                                    initialized_tx.send(()).await.unwrap();
1655                                }
1656
1657                                pending::<()>().await;
1658                            }
1659                        });
1660                        handles.lock().unwrap().push(handle);
1661                        initialized_tx.send(()).await.unwrap();
1662                    }
1663
1664                    pending::<()>().await;
1665                }
1666            });
1667
1668            // Wait for tasks to initialize
1669            for _ in 0..9 {
1670                initialized_rx.next().await.unwrap();
1671            }
1672
1673            // Verify we have all 9 handles (3 children + 6 grandchildren)
1674            assert_eq!(handles.lock().unwrap().len(), 9);
1675
1676            // Abort root task
1677            root_task.abort();
1678            assert!(matches!(root_task.await, Err(Error::Closed)));
1679
1680            // All handles should resolve with error due to cascading abort
1681            let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1682            for handle in handles {
1683                assert!(matches!(handle.await, Err(Error::Closed)));
1684            }
1685        });
1686    }
1687
1688    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1689    where
1690        R::Context: Spawner + Clock,
1691    {
1692        runner.start(|context| async move {
1693            let (child_started_tx, child_started_rx) = oneshot::channel();
1694            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1695            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1696            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1697            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1698            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1699            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1700
1701            let parent = context.spawn(move |context| async move {
1702                // Spawn a child task
1703                let child_handle = context.clone().spawn(|_| async move {
1704                    child_started_tx.send(()).unwrap();
1705                    // Wait for signal to complete
1706                    child_complete_rx.await.unwrap();
1707                });
1708                assert!(
1709                    child_handle_tx.send(child_handle).is_ok(),
1710                    "child handle receiver dropped"
1711                );
1712
1713                // Spawn an independent sibling task
1714                let sibling_handle = context.clone().spawn(move |_| async move {
1715                    sibling_started_tx.send(()).unwrap();
1716                    // Wait for signal to complete
1717                    sibling_complete_rx.await.unwrap();
1718                });
1719                assert!(
1720                    sibling_handle_tx.send(sibling_handle).is_ok(),
1721                    "sibling handle receiver dropped"
1722                );
1723
1724                // Wait for signal to complete
1725                parent_complete_rx.await.unwrap();
1726            });
1727
1728            // Wait for both to start
1729            child_started_rx.await.unwrap();
1730            sibling_started_rx.await.unwrap();
1731
1732            // Kill the sibling
1733            sibling_complete_tx.send(()).unwrap();
1734            assert!(sibling_handle_rx.await.is_ok());
1735
1736            // The child task should still be alive
1737            child_complete_tx.send(()).unwrap();
1738            assert!(child_handle_rx.await.is_ok());
1739
1740            // As well as the parent
1741            parent_complete_tx.send(()).unwrap();
1742            assert!(parent.await.is_ok());
1743        });
1744    }
1745
1746    fn test_spawn_clone_chain<R: Runner>(runner: R)
1747    where
1748        R::Context: Spawner + Clock,
1749    {
1750        runner.start(|context| async move {
1751            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1752            let (child_started_tx, child_started_rx) = oneshot::channel();
1753            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1754            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1755            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1756
1757            let parent = context.clone().spawn({
1758                move |context| async move {
1759                    let child = context.clone().spawn({
1760                        move |context| async move {
1761                            let grandchild = context.clone().spawn({
1762                                move |_| async move {
1763                                    grandchild_started_tx.send(()).unwrap();
1764                                    pending::<()>().await;
1765                                }
1766                            });
1767                            assert!(
1768                                grandchild_handle_tx.send(grandchild).is_ok(),
1769                                "grandchild handle receiver dropped"
1770                            );
1771                            child_started_tx.send(()).unwrap();
1772                            pending::<()>().await;
1773                        }
1774                    });
1775                    assert!(
1776                        child_handle_tx.send(child).is_ok(),
1777                        "child handle receiver dropped"
1778                    );
1779                    parent_started_tx.send(()).unwrap();
1780                    pending::<()>().await;
1781                }
1782            });
1783
1784            parent_started_rx.await.unwrap();
1785            child_started_rx.await.unwrap();
1786            grandchild_started_rx.await.unwrap();
1787
1788            let child_handle = child_handle_rx.await.unwrap();
1789            let grandchild_handle = grandchild_handle_rx.await.unwrap();
1790
1791            parent.abort();
1792            assert!(parent.await.is_err());
1793
1794            assert!(child_handle.await.is_err());
1795            assert!(grandchild_handle.await.is_err());
1796        });
1797    }
1798
1799    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1800    where
1801        R::Context: Spawner + Clock,
1802    {
1803        runner.start(|context| async move {
1804            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1805            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1806
1807            let parent = context.clone().spawn({
1808                move |context| async move {
1809                    let clone1 = context.clone();
1810                    let clone2 = clone1.clone();
1811                    let clone3 = clone2.clone();
1812
1813                    let leaf = clone3.clone().spawn({
1814                        move |_| async move {
1815                            leaf_started_tx.send(()).unwrap();
1816                            pending::<()>().await;
1817                        }
1818                    });
1819
1820                    leaf_handle_tx
1821                        .send(leaf)
1822                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1823                    pending::<()>().await;
1824                }
1825            });
1826
1827            leaf_started_rx.await.unwrap();
1828            let leaf_handle = leaf_handle_rx.await.unwrap();
1829
1830            parent.abort();
1831            assert!(parent.await.is_err());
1832            assert!(leaf_handle.await.is_err());
1833        });
1834    }
1835
1836    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1837    where
1838        R::Context: Spawner,
1839    {
1840        runner.start(|context| async move {
1841            let context = if dedicated {
1842                context.dedicated()
1843            } else {
1844                context.shared(true)
1845            };
1846
1847            let handle = context.spawn(|_| async move { 42 });
1848            let result = handle.await;
1849            assert!(matches!(result, Ok(42)));
1850        });
1851    }
1852
1853    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1854    where
1855        R::Context: Spawner + Clock,
1856    {
1857        runner.start(|context| async move {
1858            let context = if dedicated {
1859                context.dedicated()
1860            } else {
1861                context.shared(true)
1862            };
1863
1864            context.clone().spawn(|_| async move {
1865                panic!("blocking task panicked");
1866            });
1867
1868            // Loop until panic
1869            loop {
1870                context.sleep(Duration::from_millis(100)).await;
1871            }
1872        });
1873    }
1874
1875    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1876    where
1877        R::Context: Spawner + Clock,
1878    {
1879        let result: Result<(), Error> = runner.start(|context| async move {
1880            let context = if dedicated {
1881                context.dedicated()
1882            } else {
1883                context.shared(true)
1884            };
1885
1886            let handle = context.clone().spawn(|_| async move {
1887                panic!("blocking task panicked");
1888            });
1889            handle.await
1890        });
1891        assert!(matches!(result, Err(Error::Exited)));
1892    }
1893
1894    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1895        runner.start(|_| async move {
1896            // Setup tracked resource
1897            let dropper = Arc::new(());
1898            let executor = deterministic::Runner::default();
1899            executor.start({
1900                let dropper = dropper.clone();
1901                move |context| async move {
1902                    // Create tasks with circular dependencies through channels
1903                    let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1904                    let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1905                    let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1906
1907                    // Task 1 holds tx2 and waits on rx1
1908                    context.with_label("task1").spawn({
1909                        let mut setup_tx = setup_tx.clone();
1910                        let dropper = dropper.clone();
1911                        move |_| async move {
1912                            // Setup deadlock and mark ready
1913                            tx2.send(()).await.unwrap();
1914                            rx1.next().await.unwrap();
1915                            setup_tx.send(()).await.unwrap();
1916
1917                            // Wait forever
1918                            while rx1.next().await.is_some() {}
1919                            drop(tx2);
1920                            drop(dropper);
1921                        }
1922                    });
1923
1924                    // Task 2 holds tx1 and waits on rx2
1925                    context.with_label("task2").spawn(move |_| async move {
1926                        // Setup deadlock and mark ready
1927                        tx1.send(()).await.unwrap();
1928                        rx2.next().await.unwrap();
1929                        setup_tx.send(()).await.unwrap();
1930
1931                        // Wait forever
1932                        while rx2.next().await.is_some() {}
1933                        drop(tx1);
1934                        drop(dropper);
1935                    });
1936
1937                    // Wait for tasks to start
1938                    setup_rx.next().await.unwrap();
1939                    setup_rx.next().await.unwrap();
1940                }
1941            });
1942
1943            // After runtime drop, both tasks should be cleaned up
1944            Arc::try_unwrap(dropper).expect("references remaining");
1945        });
1946    }
1947
1948    fn test_late_waker<R: Runner>(runner: R)
1949    where
1950        R::Context: Metrics + Spawner,
1951    {
1952        // A future that captures its waker and sends it to the caller, then
1953        // stays pending forever.
1954        struct CaptureWaker {
1955            tx: Option<oneshot::Sender<Waker>>,
1956            sent: bool,
1957        }
1958        impl Future for CaptureWaker {
1959            type Output = ();
1960            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1961                if !self.sent {
1962                    if let Some(tx) = self.tx.take() {
1963                        // Send a clone of the current task's waker to the root
1964                        let _ = tx.send(cx.waker().clone());
1965                    }
1966                    self.sent = true;
1967                }
1968                Poll::Pending
1969            }
1970        }
1971
1972        // A guard that wakes the captured waker on drop.
1973        struct WakeOnDrop(Option<Waker>);
1974        impl Drop for WakeOnDrop {
1975            fn drop(&mut self) {
1976                if let Some(w) = self.0.take() {
1977                    w.wake_by_ref();
1978                }
1979            }
1980        }
1981
1982        // Run the executor to completion
1983        let holder = runner.start(|context| async move {
1984            // Wire a oneshot to receive the task waker.
1985            let (tx, rx) = oneshot::channel::<Waker>();
1986
1987            // Spawn a task that registers its waker and then stays pending.
1988            context
1989                .with_label("capture_waker")
1990                .spawn(move |_| async move {
1991                    CaptureWaker {
1992                        tx: Some(tx),
1993                        sent: false,
1994                    }
1995                    .await;
1996                });
1997
1998            // Ensure the spawned task runs and registers its waker.
1999            utils::reschedule().await;
2000
2001            // Receive the waker from the spawned task.
2002            let waker = rx.await.expect("waker not received");
2003
2004            // Return a guard that will wake after the runtime has dropped.
2005            WakeOnDrop(Some(waker))
2006        });
2007
2008        // Dropping the guard after the runtime has torn down will trigger a wake on
2009        // a task whose executor has been dropped.
2010        drop(holder);
2011    }
2012
2013    fn test_metrics<R: Runner>(runner: R)
2014    where
2015        R::Context: Metrics,
2016    {
2017        runner.start(|context| async move {
2018            // Assert label
2019            assert_eq!(context.label(), "");
2020
2021            // Register a metric
2022            let counter = Counter::<u64>::default();
2023            context.register("test", "test", counter.clone());
2024
2025            // Increment the counter
2026            counter.inc();
2027
2028            // Encode metrics
2029            let buffer = context.encode();
2030            assert!(buffer.contains("test_total 1"));
2031
2032            // Nested context
2033            let context = context.with_label("nested");
2034            let nested_counter = Counter::<u64>::default();
2035            context.register("test", "test", nested_counter.clone());
2036
2037            // Increment the counter
2038            nested_counter.inc();
2039
2040            // Encode metrics
2041            let buffer = context.encode();
2042            assert!(buffer.contains("nested_test_total 1"));
2043            assert!(buffer.contains("test_total 1"));
2044        });
2045    }
2046
2047    fn test_metrics_label<R: Runner>(runner: R)
2048    where
2049        R::Context: Metrics,
2050    {
2051        runner.start(|context| async move {
2052            context.with_label(METRICS_PREFIX);
2053        })
2054    }
2055
2056    fn test_metrics_label_empty<R: Runner>(runner: R)
2057    where
2058        R::Context: Metrics,
2059    {
2060        runner.start(|context| async move {
2061            context.with_label("");
2062        })
2063    }
2064
2065    fn test_metrics_label_invalid_first_char<R: Runner>(runner: R)
2066    where
2067        R::Context: Metrics,
2068    {
2069        runner.start(|context| async move {
2070            context.with_label("1invalid");
2071        })
2072    }
2073
2074    fn test_metrics_label_invalid_char<R: Runner>(runner: R)
2075    where
2076        R::Context: Metrics,
2077    {
2078        runner.start(|context| async move {
2079            context.with_label("invalid-label");
2080        })
2081    }
2082
2083    #[test]
2084    fn test_deterministic_future() {
2085        let runner = deterministic::Runner::default();
2086        test_error_future(runner);
2087    }
2088
2089    #[test]
2090    fn test_deterministic_clock_sleep() {
2091        let executor = deterministic::Runner::default();
2092        test_clock_sleep(executor);
2093    }
2094
2095    #[test]
2096    fn test_deterministic_clock_sleep_until() {
2097        let executor = deterministic::Runner::default();
2098        test_clock_sleep_until(executor);
2099    }
2100
2101    #[test]
2102    fn test_deterministic_clock_timeout() {
2103        let executor = deterministic::Runner::default();
2104        test_clock_timeout(executor);
2105    }
2106
2107    #[test]
2108    fn test_deterministic_root_finishes() {
2109        let executor = deterministic::Runner::default();
2110        test_root_finishes(executor);
2111    }
2112
2113    #[test]
2114    fn test_deterministic_spawn_after_abort() {
2115        let executor = deterministic::Runner::default();
2116        test_spawn_after_abort(executor);
2117    }
2118
2119    #[test]
2120    fn test_deterministic_spawn_abort() {
2121        let executor = deterministic::Runner::default();
2122        test_spawn_abort(executor, false, false);
2123    }
2124
2125    #[test]
2126    #[should_panic(expected = "blah")]
2127    fn test_deterministic_panic_aborts_root() {
2128        let runner = deterministic::Runner::default();
2129        test_panic_aborts_root(runner);
2130    }
2131
2132    #[test]
2133    #[should_panic(expected = "blah")]
2134    fn test_deterministic_panic_aborts_root_caught() {
2135        let cfg = deterministic::Config::default().with_catch_panics(true);
2136        let runner = deterministic::Runner::new(cfg);
2137        test_panic_aborts_root(runner);
2138    }
2139
2140    #[test]
2141    #[should_panic(expected = "blah")]
2142    fn test_deterministic_panic_aborts_spawn() {
2143        let executor = deterministic::Runner::default();
2144        test_panic_aborts_spawn(executor);
2145    }
2146
2147    #[test]
2148    fn test_deterministic_panic_aborts_spawn_caught() {
2149        let cfg = deterministic::Config::default().with_catch_panics(true);
2150        let executor = deterministic::Runner::new(cfg);
2151        test_panic_aborts_spawn_caught(executor);
2152    }
2153
2154    #[test]
2155    #[should_panic(expected = "boom")]
2156    fn test_deterministic_multiple_panics() {
2157        let executor = deterministic::Runner::default();
2158        test_multiple_panics(executor);
2159    }
2160
2161    #[test]
2162    fn test_deterministic_multiple_panics_caught() {
2163        let cfg = deterministic::Config::default().with_catch_panics(true);
2164        let executor = deterministic::Runner::new(cfg);
2165        test_multiple_panics_caught(executor);
2166    }
2167
2168    #[test]
2169    fn test_deterministic_select() {
2170        let executor = deterministic::Runner::default();
2171        test_select(executor);
2172    }
2173
2174    #[test]
2175    fn test_deterministic_select_loop() {
2176        let executor = deterministic::Runner::default();
2177        test_select_loop(executor);
2178    }
2179
2180    #[test]
2181    fn test_deterministic_storage_operations() {
2182        let executor = deterministic::Runner::default();
2183        test_storage_operations(executor);
2184    }
2185
2186    #[test]
2187    fn test_deterministic_blob_read_write() {
2188        let executor = deterministic::Runner::default();
2189        test_blob_read_write(executor);
2190    }
2191
2192    #[test]
2193    fn test_deterministic_blob_resize() {
2194        let executor = deterministic::Runner::default();
2195        test_blob_resize(executor);
2196    }
2197
2198    #[test]
2199    fn test_deterministic_many_partition_read_write() {
2200        let executor = deterministic::Runner::default();
2201        test_many_partition_read_write(executor);
2202    }
2203
2204    #[test]
2205    fn test_deterministic_blob_read_past_length() {
2206        let executor = deterministic::Runner::default();
2207        test_blob_read_past_length(executor);
2208    }
2209
2210    #[test]
2211    fn test_deterministic_blob_clone_and_concurrent_read() {
2212        // Run test
2213        let executor = deterministic::Runner::default();
2214        test_blob_clone_and_concurrent_read(executor);
2215    }
2216
2217    #[test]
2218    fn test_deterministic_shutdown() {
2219        let executor = deterministic::Runner::default();
2220        test_shutdown(executor);
2221    }
2222
2223    #[test]
2224    fn test_deterministic_shutdown_multiple_signals() {
2225        let executor = deterministic::Runner::default();
2226        test_shutdown_multiple_signals(executor);
2227    }
2228
2229    #[test]
2230    fn test_deterministic_shutdown_timeout() {
2231        let executor = deterministic::Runner::default();
2232        test_shutdown_timeout(executor);
2233    }
2234
2235    #[test]
2236    fn test_deterministic_shutdown_multiple_stop_calls() {
2237        let executor = deterministic::Runner::default();
2238        test_shutdown_multiple_stop_calls(executor);
2239    }
2240
2241    #[test]
2242    fn test_deterministic_unfulfilled_shutdown() {
2243        let executor = deterministic::Runner::default();
2244        test_unfulfilled_shutdown(executor);
2245    }
2246
2247    #[test]
2248    fn test_deterministic_spawn_dedicated() {
2249        let executor = deterministic::Runner::default();
2250        test_spawn_dedicated(executor);
2251    }
2252
2253    #[test]
2254    fn test_deterministic_spawn() {
2255        let runner = deterministic::Runner::default();
2256        test_spawn(runner);
2257    }
2258
2259    #[test]
2260    fn test_deterministic_spawn_abort_on_parent_abort() {
2261        let runner = deterministic::Runner::default();
2262        test_spawn_abort_on_parent_abort(runner);
2263    }
2264
2265    #[test]
2266    fn test_deterministic_spawn_abort_on_parent_completion() {
2267        let runner = deterministic::Runner::default();
2268        test_spawn_abort_on_parent_completion(runner);
2269    }
2270
2271    #[test]
2272    fn test_deterministic_spawn_cascading_abort() {
2273        let runner = deterministic::Runner::default();
2274        test_spawn_cascading_abort(runner);
2275    }
2276
2277    #[test]
2278    fn test_deterministic_child_survives_sibling_completion() {
2279        let runner = deterministic::Runner::default();
2280        test_child_survives_sibling_completion(runner);
2281    }
2282
2283    #[test]
2284    fn test_deterministic_spawn_clone_chain() {
2285        let runner = deterministic::Runner::default();
2286        test_spawn_clone_chain(runner);
2287    }
2288
2289    #[test]
2290    fn test_deterministic_spawn_sparse_clone_chain() {
2291        let runner = deterministic::Runner::default();
2292        test_spawn_sparse_clone_chain(runner);
2293    }
2294
2295    #[test]
2296    fn test_deterministic_spawn_blocking() {
2297        for dedicated in [false, true] {
2298            let executor = deterministic::Runner::default();
2299            test_spawn_blocking(executor, dedicated);
2300        }
2301    }
2302
2303    #[test]
2304    #[should_panic(expected = "blocking task panicked")]
2305    fn test_deterministic_spawn_blocking_panic() {
2306        for dedicated in [false, true] {
2307            let executor = deterministic::Runner::default();
2308            test_spawn_blocking_panic(executor, dedicated);
2309        }
2310    }
2311
2312    #[test]
2313    fn test_deterministic_spawn_blocking_panic_caught() {
2314        for dedicated in [false, true] {
2315            let cfg = deterministic::Config::default().with_catch_panics(true);
2316            let executor = deterministic::Runner::new(cfg);
2317            test_spawn_blocking_panic_caught(executor, dedicated);
2318        }
2319    }
2320
2321    #[test]
2322    fn test_deterministic_spawn_blocking_abort() {
2323        for (dedicated, blocking) in [(false, true), (true, false)] {
2324            let executor = deterministic::Runner::default();
2325            test_spawn_abort(executor, dedicated, blocking);
2326        }
2327    }
2328
2329    #[test]
2330    fn test_deterministic_circular_reference_prevents_cleanup() {
2331        let executor = deterministic::Runner::default();
2332        test_circular_reference_prevents_cleanup(executor);
2333    }
2334
2335    #[test]
2336    fn test_deterministic_late_waker() {
2337        let executor = deterministic::Runner::default();
2338        test_late_waker(executor);
2339    }
2340
2341    #[test]
2342    fn test_deterministic_metrics() {
2343        let executor = deterministic::Runner::default();
2344        test_metrics(executor);
2345    }
2346
2347    #[test]
2348    #[should_panic]
2349    fn test_deterministic_metrics_label() {
2350        let executor = deterministic::Runner::default();
2351        test_metrics_label(executor);
2352    }
2353
2354    #[test]
2355    #[should_panic(expected = "label must start with [a-zA-Z]")]
2356    fn test_deterministic_metrics_label_empty() {
2357        let executor = deterministic::Runner::default();
2358        test_metrics_label_empty(executor);
2359    }
2360
2361    #[test]
2362    #[should_panic(expected = "label must start with [a-zA-Z]")]
2363    fn test_deterministic_metrics_label_invalid_first_char() {
2364        let executor = deterministic::Runner::default();
2365        test_metrics_label_invalid_first_char(executor);
2366    }
2367
2368    #[test]
2369    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2370    fn test_deterministic_metrics_label_invalid_char() {
2371        let executor = deterministic::Runner::default();
2372        test_metrics_label_invalid_char(executor);
2373    }
2374
2375    #[test_collect_traces]
2376    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2377        let executor = deterministic::Runner::new(deterministic::Config::default());
2378        executor.start(|context| async move {
2379            context
2380                .with_label("test")
2381                .instrumented()
2382                .spawn(|context| async move {
2383                    tracing::info!(field = "test field", "test log");
2384
2385                    context
2386                        .with_label("inner")
2387                        .instrumented()
2388                        .spawn(|_| async move {
2389                            tracing::info!("inner log");
2390                        })
2391                        .await
2392                        .unwrap();
2393                })
2394                .await
2395                .unwrap();
2396        });
2397
2398        let info_traces = traces.get_by_level(Level::INFO);
2399        assert_eq!(info_traces.len(), 2);
2400
2401        // Outer log (single span)
2402        info_traces
2403            .expect_event_at_index(0, |event| {
2404                event.metadata.expect_content_exact("test log")?;
2405                event.metadata.expect_field_count(1)?;
2406                event.metadata.expect_field_exact("field", "test field")?;
2407                event.expect_span_count(1)?;
2408                event.expect_span_at_index(0, |span| {
2409                    span.expect_content_exact("task")?;
2410                    span.expect_field_count(1)?;
2411                    span.expect_field_exact("name", "test")
2412                })
2413            })
2414            .unwrap();
2415
2416        info_traces
2417            .expect_event_at_index(1, |event| {
2418                event.metadata.expect_content_exact("inner log")?;
2419                event.metadata.expect_field_count(0)?;
2420                event.expect_span_count(1)?;
2421                event.expect_span_at_index(0, |span| {
2422                    span.expect_content_exact("task")?;
2423                    span.expect_field_count(1)?;
2424                    span.expect_field_exact("name", "test_inner")
2425                })
2426            })
2427            .unwrap();
2428    }
2429
2430    #[test]
2431    fn test_deterministic_resolver() {
2432        let executor = deterministic::Runner::default();
2433        executor.start(|context| async move {
2434            // Register DNS mappings
2435            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
2436            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
2437            context.resolver_register("example.com", Some(vec![ip1, ip2]));
2438
2439            // Resolve registered hostname
2440            let addrs = context.resolve("example.com").await.unwrap();
2441            assert_eq!(addrs, vec![ip1, ip2]);
2442
2443            // Resolve unregistered hostname
2444            let result = context.resolve("unknown.com").await;
2445            assert!(matches!(result, Err(Error::ResolveFailed(_))));
2446
2447            // Remove mapping
2448            context.resolver_register("example.com", None);
2449            let result = context.resolve("example.com").await;
2450            assert!(matches!(result, Err(Error::ResolveFailed(_))));
2451        });
2452    }
2453
2454    #[test]
2455    fn test_tokio_error_future() {
2456        let runner = tokio::Runner::default();
2457        test_error_future(runner);
2458    }
2459
2460    #[test]
2461    fn test_tokio_clock_sleep() {
2462        let executor = tokio::Runner::default();
2463        test_clock_sleep(executor);
2464    }
2465
2466    #[test]
2467    fn test_tokio_clock_sleep_until() {
2468        let executor = tokio::Runner::default();
2469        test_clock_sleep_until(executor);
2470    }
2471
2472    #[test]
2473    fn test_tokio_clock_timeout() {
2474        let executor = tokio::Runner::default();
2475        test_clock_timeout(executor);
2476    }
2477
2478    #[test]
2479    fn test_tokio_root_finishes() {
2480        let executor = tokio::Runner::default();
2481        test_root_finishes(executor);
2482    }
2483
2484    #[test]
2485    fn test_tokio_spawn_after_abort() {
2486        let executor = tokio::Runner::default();
2487        test_spawn_after_abort(executor);
2488    }
2489
2490    #[test]
2491    fn test_tokio_spawn_abort() {
2492        let executor = tokio::Runner::default();
2493        test_spawn_abort(executor, false, false);
2494    }
2495
2496    #[test]
2497    #[should_panic(expected = "blah")]
2498    fn test_tokio_panic_aborts_root() {
2499        let executor = tokio::Runner::default();
2500        test_panic_aborts_root(executor);
2501    }
2502
2503    #[test]
2504    #[should_panic(expected = "blah")]
2505    fn test_tokio_panic_aborts_root_caught() {
2506        let cfg = tokio::Config::default().with_catch_panics(true);
2507        let executor = tokio::Runner::new(cfg);
2508        test_panic_aborts_root(executor);
2509    }
2510
2511    #[test]
2512    #[should_panic(expected = "blah")]
2513    fn test_tokio_panic_aborts_spawn() {
2514        let executor = tokio::Runner::default();
2515        test_panic_aborts_spawn(executor);
2516    }
2517
2518    #[test]
2519    fn test_tokio_panic_aborts_spawn_caught() {
2520        let cfg = tokio::Config::default().with_catch_panics(true);
2521        let executor = tokio::Runner::new(cfg);
2522        test_panic_aborts_spawn_caught(executor);
2523    }
2524
2525    #[test]
2526    #[should_panic(expected = "boom")]
2527    fn test_tokio_multiple_panics() {
2528        let executor = tokio::Runner::default();
2529        test_multiple_panics(executor);
2530    }
2531
2532    #[test]
2533    fn test_tokio_multiple_panics_caught() {
2534        let cfg = tokio::Config::default().with_catch_panics(true);
2535        let executor = tokio::Runner::new(cfg);
2536        test_multiple_panics_caught(executor);
2537    }
2538
2539    #[test]
2540    fn test_tokio_select() {
2541        let executor = tokio::Runner::default();
2542        test_select(executor);
2543    }
2544
2545    #[test]
2546    fn test_tokio_select_loop() {
2547        let executor = tokio::Runner::default();
2548        test_select_loop(executor);
2549    }
2550
2551    #[test]
2552    fn test_tokio_storage_operations() {
2553        let executor = tokio::Runner::default();
2554        test_storage_operations(executor);
2555    }
2556
2557    #[test]
2558    fn test_tokio_blob_read_write() {
2559        let executor = tokio::Runner::default();
2560        test_blob_read_write(executor);
2561    }
2562
2563    #[test]
2564    fn test_tokio_blob_resize() {
2565        let executor = tokio::Runner::default();
2566        test_blob_resize(executor);
2567    }
2568
2569    #[test]
2570    fn test_tokio_many_partition_read_write() {
2571        let executor = tokio::Runner::default();
2572        test_many_partition_read_write(executor);
2573    }
2574
2575    #[test]
2576    fn test_tokio_blob_read_past_length() {
2577        let executor = tokio::Runner::default();
2578        test_blob_read_past_length(executor);
2579    }
2580
2581    #[test]
2582    fn test_tokio_blob_clone_and_concurrent_read() {
2583        // Run test
2584        let executor = tokio::Runner::default();
2585        test_blob_clone_and_concurrent_read(executor);
2586    }
2587
2588    #[test]
2589    fn test_tokio_shutdown() {
2590        let executor = tokio::Runner::default();
2591        test_shutdown(executor);
2592    }
2593
2594    #[test]
2595    fn test_tokio_shutdown_multiple_signals() {
2596        let executor = tokio::Runner::default();
2597        test_shutdown_multiple_signals(executor);
2598    }
2599
2600    #[test]
2601    fn test_tokio_shutdown_timeout() {
2602        let executor = tokio::Runner::default();
2603        test_shutdown_timeout(executor);
2604    }
2605
2606    #[test]
2607    fn test_tokio_shutdown_multiple_stop_calls() {
2608        let executor = tokio::Runner::default();
2609        test_shutdown_multiple_stop_calls(executor);
2610    }
2611
2612    #[test]
2613    fn test_tokio_unfulfilled_shutdown() {
2614        let executor = tokio::Runner::default();
2615        test_unfulfilled_shutdown(executor);
2616    }
2617
2618    #[test]
2619    fn test_tokio_spawn_dedicated() {
2620        let executor = tokio::Runner::default();
2621        test_spawn_dedicated(executor);
2622    }
2623
2624    #[test]
2625    fn test_tokio_spawn() {
2626        let runner = tokio::Runner::default();
2627        test_spawn(runner);
2628    }
2629
2630    #[test]
2631    fn test_tokio_spawn_abort_on_parent_abort() {
2632        let runner = tokio::Runner::default();
2633        test_spawn_abort_on_parent_abort(runner);
2634    }
2635
2636    #[test]
2637    fn test_tokio_spawn_abort_on_parent_completion() {
2638        let runner = tokio::Runner::default();
2639        test_spawn_abort_on_parent_completion(runner);
2640    }
2641
2642    #[test]
2643    fn test_tokio_spawn_cascading_abort() {
2644        let runner = tokio::Runner::default();
2645        test_spawn_cascading_abort(runner);
2646    }
2647
2648    #[test]
2649    fn test_tokio_child_survives_sibling_completion() {
2650        let runner = tokio::Runner::default();
2651        test_child_survives_sibling_completion(runner);
2652    }
2653
2654    #[test]
2655    fn test_tokio_spawn_clone_chain() {
2656        let runner = tokio::Runner::default();
2657        test_spawn_clone_chain(runner);
2658    }
2659
2660    #[test]
2661    fn test_tokio_spawn_sparse_clone_chain() {
2662        let runner = tokio::Runner::default();
2663        test_spawn_sparse_clone_chain(runner);
2664    }
2665
2666    #[test]
2667    fn test_tokio_spawn_blocking() {
2668        for dedicated in [false, true] {
2669            let executor = tokio::Runner::default();
2670            test_spawn_blocking(executor, dedicated);
2671        }
2672    }
2673
2674    #[test]
2675    #[should_panic(expected = "blocking task panicked")]
2676    fn test_tokio_spawn_blocking_panic() {
2677        for dedicated in [false, true] {
2678            let executor = tokio::Runner::default();
2679            test_spawn_blocking_panic(executor, dedicated);
2680        }
2681    }
2682
2683    #[test]
2684    fn test_tokio_spawn_blocking_panic_caught() {
2685        for dedicated in [false, true] {
2686            let cfg = tokio::Config::default().with_catch_panics(true);
2687            let executor = tokio::Runner::new(cfg);
2688            test_spawn_blocking_panic_caught(executor, dedicated);
2689        }
2690    }
2691
2692    #[test]
2693    fn test_tokio_spawn_blocking_abort() {
2694        for (dedicated, blocking) in [(false, true), (true, false)] {
2695            let executor = tokio::Runner::default();
2696            test_spawn_abort(executor, dedicated, blocking);
2697        }
2698    }
2699
2700    #[test]
2701    fn test_tokio_circular_reference_prevents_cleanup() {
2702        let executor = tokio::Runner::default();
2703        test_circular_reference_prevents_cleanup(executor);
2704    }
2705
2706    #[test]
2707    fn test_tokio_late_waker() {
2708        let executor = tokio::Runner::default();
2709        test_late_waker(executor);
2710    }
2711
2712    #[test]
2713    fn test_tokio_metrics() {
2714        let executor = tokio::Runner::default();
2715        test_metrics(executor);
2716    }
2717
2718    #[test]
2719    #[should_panic]
2720    fn test_tokio_metrics_label() {
2721        let executor = tokio::Runner::default();
2722        test_metrics_label(executor);
2723    }
2724
2725    #[test]
2726    #[should_panic(expected = "label must start with [a-zA-Z]")]
2727    fn test_tokio_metrics_label_empty() {
2728        let executor = tokio::Runner::default();
2729        test_metrics_label_empty(executor);
2730    }
2731
2732    #[test]
2733    #[should_panic(expected = "label must start with [a-zA-Z]")]
2734    fn test_tokio_metrics_label_invalid_first_char() {
2735        let executor = tokio::Runner::default();
2736        test_metrics_label_invalid_first_char(executor);
2737    }
2738
2739    #[test]
2740    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2741    fn test_tokio_metrics_label_invalid_char() {
2742        let executor = tokio::Runner::default();
2743        test_metrics_label_invalid_char(executor);
2744    }
2745
2746    #[test]
2747    fn test_tokio_process_rss_metric() {
2748        let executor = tokio::Runner::default();
2749        executor.start(|context| async move {
2750            loop {
2751                // Wait for RSS metric to be available
2752                let metrics = context.encode();
2753                if !metrics.contains("runtime_process_rss") {
2754                    context.sleep(Duration::from_millis(100)).await;
2755                    continue;
2756                }
2757
2758                // Verify the RSS value is eventually populated (greater than 0)
2759                for line in metrics.lines() {
2760                    if line.starts_with("runtime_process_rss")
2761                        && !line.starts_with("runtime_process_rss{")
2762                    {
2763                        let parts: Vec<&str> = line.split_whitespace().collect();
2764                        if parts.len() >= 2 {
2765                            let rss_value: i64 =
2766                                parts[1].parse().expect("Failed to parse RSS value");
2767                            if rss_value > 0 {
2768                                return;
2769                            }
2770                        }
2771                    }
2772                }
2773            }
2774        });
2775    }
2776
2777    #[test]
2778    fn test_tokio_telemetry() {
2779        let executor = tokio::Runner::default();
2780        executor.start(|context| async move {
2781            // Define the server address
2782            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2783
2784            // Configure telemetry
2785            tokio::telemetry::init(
2786                context.with_label("metrics"),
2787                tokio::telemetry::Logging {
2788                    level: Level::INFO,
2789                    json: false,
2790                },
2791                Some(address),
2792                None,
2793            );
2794
2795            // Register a test metric
2796            let counter: Counter<u64> = Counter::default();
2797            context.register("test_counter", "Test counter", counter.clone());
2798            counter.inc();
2799
2800            // Helper functions to parse HTTP response
2801            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2802                let mut line = Vec::new();
2803                loop {
2804                    let mut byte = [0u8; 1];
2805                    stream.recv(&mut byte[..]).await?;
2806                    if byte[0] == b'\n' {
2807                        if line.last() == Some(&b'\r') {
2808                            line.pop(); // Remove trailing \r
2809                        }
2810                        break;
2811                    }
2812                    line.push(byte[0]);
2813                }
2814                String::from_utf8(line).map_err(|_| Error::ReadFailed)
2815            }
2816
2817            async fn read_headers<St: Stream>(
2818                stream: &mut St,
2819            ) -> Result<HashMap<String, String>, Error> {
2820                let mut headers = HashMap::new();
2821                loop {
2822                    let line = read_line(stream).await?;
2823                    if line.is_empty() {
2824                        break;
2825                    }
2826                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
2827                    if parts.len() == 2 {
2828                        headers.insert(parts[0].to_string(), parts[1].to_string());
2829                    }
2830                }
2831                Ok(headers)
2832            }
2833
2834            async fn read_body<St: Stream>(
2835                stream: &mut St,
2836                content_length: usize,
2837            ) -> Result<String, Error> {
2838                let mut read = vec![0; content_length];
2839                stream.recv(&mut read[..]).await?;
2840                String::from_utf8(read).map_err(|_| Error::ReadFailed)
2841            }
2842
2843            // Simulate a client connecting to the server
2844            let client_handle = context
2845                .with_label("client")
2846                .spawn(move |context| async move {
2847                    let (mut sink, mut stream) = loop {
2848                        match context.dial(address).await {
2849                            Ok((sink, stream)) => break (sink, stream),
2850                            Err(e) => {
2851                                // The client may be polled before the server is ready, that's alright!
2852                                error!(err =?e, "failed to connect");
2853                                context.sleep(Duration::from_millis(10)).await;
2854                            }
2855                        }
2856                    };
2857
2858                    // Send a GET request to the server
2859                    let request = format!(
2860                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2861                    );
2862                    sink.send(Bytes::from(request)).await.unwrap();
2863
2864                    // Read and verify the HTTP status line
2865                    let status_line = read_line(&mut stream).await.unwrap();
2866                    assert_eq!(status_line, "HTTP/1.1 200 OK");
2867
2868                    // Read and parse headers
2869                    let headers = read_headers(&mut stream).await.unwrap();
2870                    println!("Headers: {headers:?}");
2871                    let content_length = headers
2872                        .get("content-length")
2873                        .unwrap()
2874                        .parse::<usize>()
2875                        .unwrap();
2876
2877                    // Read and verify the body
2878                    let body = read_body(&mut stream, content_length).await.unwrap();
2879                    assert!(body.contains("test_counter_total 1"));
2880                });
2881
2882            // Wait for the client task to complete
2883            client_handle.await.unwrap();
2884        });
2885    }
2886
2887    #[test]
2888    fn test_tokio_resolver() {
2889        let executor = tokio::Runner::default();
2890        executor.start(|context| async move {
2891            let addrs = context.resolve("localhost").await.unwrap();
2892            assert!(!addrs.is_empty());
2893            for addr in addrs {
2894                assert!(
2895                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
2896                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
2897                );
2898            }
2899        });
2900    }
2901
2902    #[test]
2903    fn test_create_pool_tokio() {
2904        let executor = tokio::Runner::default();
2905        executor.start(|context| async move {
2906            // Create a thread pool with 4 threads
2907            let pool = context.with_label("pool").create_pool(NZUsize!(4)).unwrap();
2908
2909            // Create a vector of numbers
2910            let v: Vec<_> = (0..10000).collect();
2911
2912            // Use the thread pool to sum the numbers
2913            pool.install(|| {
2914                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
2915            });
2916        });
2917    }
2918
2919    #[test]
2920    fn test_create_pool_deterministic() {
2921        let executor = deterministic::Runner::default();
2922        executor.start(|context| async move {
2923            // Create a thread pool with 4 threads
2924            let pool = context.with_label("pool").create_pool(NZUsize!(4)).unwrap();
2925
2926            // Create a vector of numbers
2927            let v: Vec<_> = (0..10000).collect();
2928
2929            // Use the thread pool to sum the numbers
2930            pool.install(|| {
2931                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
2932            });
2933        });
2934    }
2935}