Skip to main content

commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
18
19#![doc(
20    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21    html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34    pub mod deterministic;
35    pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38    pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41    mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44    pub mod tokio;
45});
46stability_scope!(BETA {
47    use commonware_macros::select;
48    use commonware_parallel::{Rayon, ThreadPool};
49    use iobuf::PoolError;
50    use prometheus_client::registry::Metric;
51    use rayon::ThreadPoolBuildError;
52    use std::{
53        future::Future,
54        io::Error as IoError,
55        net::SocketAddr,
56        num::NonZeroUsize,
57        time::{Duration, SystemTime},
58    };
59    use thiserror::Error;
60
61    /// Prefix for runtime metrics.
62    pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64    /// Re-export of `Buf` and `BufMut` traits for usage with [I/O buffers](iobuf).
65    pub use bytes::{Buf, BufMut};
66    /// Re-export of [governor::Quota] for rate limiting configuration.
67    pub use governor::Quota;
68
69    pub mod iobuf;
70    pub use iobuf::{BufferPool, BufferPoolConfig, IoBuf, IoBufMut, IoBufs, IoBufsMut};
71
72    pub mod utils;
73    pub use utils::*;
74
75    pub mod telemetry;
76
77    /// Default [`Blob`] version used when no version is specified via [`Storage::open`].
78    pub const DEFAULT_BLOB_VERSION: u16 = 0;
79
80    /// Errors that can occur when interacting with the runtime.
81    #[derive(Error, Debug)]
82    pub enum Error {
83        #[error("exited")]
84        Exited,
85        #[error("closed")]
86        Closed,
87        #[error("timeout")]
88        Timeout,
89        #[error("bind failed")]
90        BindFailed,
91        #[error("connection failed")]
92        ConnectionFailed,
93        #[error("write failed")]
94        WriteFailed,
95        #[error("read failed")]
96        ReadFailed,
97        #[error("send failed")]
98        SendFailed,
99        #[error("recv failed")]
100        RecvFailed,
101        #[error("dns resolution failed: {0}")]
102        ResolveFailed(String),
103        #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
104        PartitionNameInvalid(String),
105        #[error("partition creation failed: {0}")]
106        PartitionCreationFailed(String),
107        #[error("partition missing: {0}")]
108        PartitionMissing(String),
109        #[error("partition corrupt: {0}")]
110        PartitionCorrupt(String),
111        #[error("blob open failed: {0}/{1} error: {2}")]
112        BlobOpenFailed(String, String, IoError),
113        #[error("blob missing: {0}/{1}")]
114        BlobMissing(String, String),
115        #[error("blob resize failed: {0}/{1} error: {2}")]
116        BlobResizeFailed(String, String, IoError),
117        #[error("blob sync failed: {0}/{1} error: {2}")]
118        BlobSyncFailed(String, String, IoError),
119        #[error("blob insufficient length")]
120        BlobInsufficientLength,
121        #[error("blob corrupt: {0}/{1} reason: {2}")]
122        BlobCorrupt(String, String, String),
123        #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
124        BlobVersionMismatch {
125            expected: std::ops::RangeInclusive<u16>,
126            found: u16,
127        },
128        #[error("invalid or missing checksum")]
129        InvalidChecksum,
130        #[error("offset overflow")]
131        OffsetOverflow,
132        #[error("immutable blob")]
133        ImmutableBlob,
134        #[error("io error: {0}")]
135        Io(#[from] IoError),
136        #[error("buffer pool: {0}")]
137        Pool(#[from] PoolError),
138    }
139
140    /// Interface that any task scheduler must implement to start
141    /// running tasks.
142    pub trait Runner {
143        /// Context defines the environment available to tasks.
144        /// Example of possible services provided by the context include:
145        /// - [Clock] for time-based operations
146        /// - [Network] for network operations
147        /// - [Storage] for storage operations
148        type Context;
149
150        /// Start running a root task.
151        ///
152        /// When this function returns, all spawned tasks will be canceled. If clean
153        /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
154        /// [Spawner::stopped] to coordinate clean shutdown.
155        fn start<F, Fut>(self, f: F) -> Fut::Output
156        where
157            F: FnOnce(Self::Context) -> Fut,
158            Fut: Future;
159    }
160
161    /// Interface that any task scheduler must implement to spawn tasks.
162    pub trait Spawner: Clone + Send + Sync + 'static {
163        /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
164        ///
165        /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
166        /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
167        /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
168        /// [`Spawner::dedicated`] instead.
169        ///
170        /// The shared executor with `blocking == false` is the default spawn mode.
171        fn shared(self, blocking: bool) -> Self;
172
173        /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
174        ///
175        /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
176        /// shared executor.
177        ///
178        /// This is not the default behavior. See [`Spawner::shared`] for more information.
179        fn dedicated(self) -> Self;
180
181        /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
182        fn instrumented(self) -> Self;
183
184        /// Spawn a task with the current context.
185        ///
186        /// Unlike directly awaiting a future, the task starts running immediately even if the caller
187        /// never awaits the returned [`Handle`].
188        ///
189        /// # Mandatory Supervision
190        ///
191        /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
192        ///
193        /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
194        /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
195        ///
196        /// ```txt
197        /// ctx_a
198        ///   |
199        ///   +-- clone() ---> ctx_c
200        ///   |                  |
201        ///   |                  +-- spawn() ---> Task C (ctx_d)
202        ///   |
203        ///   +-- spawn() ---> Task A (ctx_b)
204        ///                              |
205        ///                              +-- spawn() ---> Task B (ctx_e)
206        ///
207        /// Task A finishes or aborts --> Task B and Task C are aborted
208        /// ```
209        ///
210        /// # Spawn Configuration
211        ///
212        /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
213        /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
214        ///
215        /// Child tasks should assume they start from a clean configuration without needing to inspect how their
216        /// parent was configured.
217        fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
218        where
219            F: FnOnce(Self) -> Fut + Send + 'static,
220            Fut: Future<Output = T> + Send + 'static,
221            T: Send + 'static;
222
223        /// Signals the runtime to stop execution and waits for all outstanding tasks
224        /// to perform any required cleanup and exit.
225        ///
226        /// This method does not actually kill any tasks but rather signals to them, using
227        /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
228        /// It then waits for all [signal::Signal] references to be dropped before returning.
229        ///
230        /// ## Multiple Stop Calls
231        ///
232        /// This method is idempotent and safe to call multiple times concurrently (on
233        /// different instances of the same context since it consumes `self`). The first
234        /// call initiates shutdown with the provided `value`, and all subsequent calls
235        /// will wait for the same completion regardless of their `value` parameter, i.e.
236        /// the original `value` from the first call is preserved.
237        ///
238        /// ## Timeout
239        ///
240        /// If a timeout is provided, the method will return an error if all [signal::Signal]
241        /// references have not been dropped within the specified duration.
242        fn stop(
243            self,
244            value: i32,
245            timeout: Option<Duration>,
246        ) -> impl Future<Output = Result<(), Error>> + Send;
247
248        /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
249        /// any task.
250        ///
251        /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
252        /// immediately. The [signal::Signal] returned will always resolve to the value of the
253        /// first [Spawner::stop] call.
254        fn stopped(&self) -> signal::Signal;
255    }
256
257    /// Trait for creating [rayon]-compatible thread pools with each worker thread
258    /// placed on dedicated threads via [Spawner].
259    pub trait ThreadPooler: Spawner + Metrics {
260        /// Creates a clone-able [rayon]-compatible thread pool with [Spawner::spawn].
261        ///
262        /// # Arguments
263        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
264        ///
265        /// # Returns
266        /// A `Result` containing the configured [rayon::ThreadPool] or a [rayon::ThreadPoolBuildError] if the pool cannot
267        /// be built.
268        fn create_thread_pool(
269            &self,
270            concurrency: NonZeroUsize,
271        ) -> Result<ThreadPool, ThreadPoolBuildError>;
272
273        /// Creates a clone-able [Rayon] strategy for use with [commonware_parallel].
274        ///
275        /// # Arguments
276        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
277        ///
278        /// # Returns
279        /// A `Result` containing the configured [Rayon] strategy or a [rayon::ThreadPoolBuildError] if the pool cannot be
280        /// built.
281        fn create_strategy(
282            &self,
283            concurrency: NonZeroUsize,
284        ) -> Result<Rayon, ThreadPoolBuildError> {
285            self.create_thread_pool(concurrency).map(Rayon::with_pool)
286        }
287    }
288
289    /// Interface to register and encode metrics.
290    pub trait Metrics: Clone + Send + Sync + 'static {
291        /// Get the current label of the context.
292        fn label(&self) -> String;
293
294        /// Create a new instance of `Metrics` with the given label appended to the end
295        /// of the current `Metrics` label.
296        ///
297        /// This is commonly used to create a nested context for `register`.
298        ///
299        /// Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. It is not permitted for
300        /// any implementation to use `METRICS_PREFIX` as the start of a label (reserved for metrics for the runtime).
301        fn with_label(&self, label: &str) -> Self;
302
303        /// Create a new instance of `Metrics` with an additional attribute (key-value pair) applied
304        /// to all metrics registered in this context and any child contexts.
305        ///
306        /// Unlike [`Metrics::with_label`] which affects the metric name prefix, `with_attribute` adds
307        /// a key-value pair that appears as a separate dimension in the metric output. This is
308        /// useful for instrumenting n-ary data structures in a way that is easy to manage downstream.
309        ///
310        /// Keys must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. Values can be any string.
311        ///
312        /// # Labeling Children
313        ///
314        /// Attributes apply to the entire subtree of contexts. When you call `with_attribute`, the
315        /// label is automatically added to all metrics registered in that context and any child
316        /// contexts created via `with_label`:
317        ///
318        /// ```text
319        /// context
320        ///   |-- with_label("orchestrator")
321        ///         |-- with_attribute("epoch", "5")
322        ///               |-- counter: votes        -> orchestrator_votes{epoch="5"}
323        ///               |-- counter: proposals    -> orchestrator_proposals{epoch="5"}
324        ///               |-- with_label("engine")
325        ///                     |-- gauge: height   -> orchestrator_engine_height{epoch="5"}
326        /// ```
327        ///
328        /// This pattern avoids wrapping every metric in a `Family` and avoids polluting metric
329        /// names with dynamic values like `orchestrator_epoch_5_votes`.
330        ///
331        /// _Using attributes does not reduce cardinality (N epochs still means N time series).
332        /// Attributes just make metrics easier to query, filter, and aggregate._
333        ///
334        /// # Family Label Conflicts
335        ///
336        /// When using `Family` metrics, avoid using attribute keys that match the Family's label field names.
337        /// If a conflict occurs, the encoded output will contain duplicate labels (e.g., `{env="prod",env="staging"}`),
338        /// which is invalid Prometheus format and may cause scraping issues.
339        ///
340        /// ```ignore
341        /// #[derive(EncodeLabelSet)]
342        /// struct Labels { env: String }
343        ///
344        /// // BAD: attribute "env" conflicts with Family field "env"
345        /// let ctx = context.with_attribute("env", "prod");
346        /// let family: Family<Labels, Counter> = Family::default();
347        /// ctx.register("requests", "help", family);
348        /// // Produces invalid: requests_total{env="prod",env="staging"}
349        ///
350        /// // GOOD: use distinct names
351        /// let ctx = context.with_attribute("region", "us_east");
352        /// // Produces valid: requests_total{region="us_east",env="staging"}
353        /// ```
354        ///
355        /// # Example
356        ///
357        /// ```ignore
358        /// // Instead of creating epoch-specific metric names:
359        /// let ctx = context.with_label(&format!("consensus_engine_{}", epoch));
360        /// // Produces: consensus_engine_5_votes_total, consensus_engine_6_votes_total, ...
361        ///
362        /// // Use attributes to add epoch as a label dimension:
363        /// let ctx = context.with_label("consensus_engine").with_attribute("epoch", epoch);
364        /// // Produces: consensus_engine_votes_total{epoch="5"}, consensus_engine_votes_total{epoch="6"}, ...
365        /// ```
366        ///
367        /// Multiple attributes can be chained:
368        /// ```ignore
369        /// let ctx = context
370        ///     .with_label("engine")
371        ///     .with_attribute("region", "us_east")
372        ///     .with_attribute("instance", "i1");
373        /// // Produces: engine_requests_total{region="us_east",instance="i1"} 42
374        /// ```
375        ///
376        /// # Querying The Latest Attribute
377        ///
378        /// To query the latest attribute value dynamically, create a gauge to track the current value:
379        /// ```ignore
380        /// // Create a gauge to track the current epoch
381        /// let latest_epoch = Gauge::<i64>::default();
382        /// context.with_label("orchestrator").register("latest_epoch", "current epoch", latest_epoch.clone());
383        /// latest_epoch.set(current_epoch);
384        /// // Produces: orchestrator_latest_epoch 5
385        /// ```
386        ///
387        /// Then create a dashboard variable `$latest_epoch` with query `max(orchestrator_latest_epoch)`
388        /// and use it in panel queries: `consensus_engine_votes_total{epoch="$latest_epoch"}`
389        fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
390
391        /// Prefix the given label with the current context's label.
392        ///
393        /// Unlike `with_label`, this method does not create a new context.
394        fn scoped_label(&self, label: &str) -> String {
395            let label = if self.label().is_empty() {
396                label.to_string()
397            } else {
398                format!("{}_{}", self.label(), label)
399            };
400            assert!(
401                !label.starts_with(METRICS_PREFIX),
402                "using runtime label is not allowed"
403            );
404            label
405        }
406
407        /// Register a metric with the runtime.
408        ///
409        /// Any registered metric will include (as a prefix) the label of the current context.
410        ///
411        /// Names must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`.
412        fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
413
414        /// Encode all metrics into a buffer.
415        ///
416        /// To ensure downstream analytics tools work correctly, users must never duplicate metrics
417        /// (via the concatenation of nested `with_label` and `register` calls). This can be avoided
418        /// by using `with_label` to create new context instances (ensures all context instances are
419        /// namespaced).
420        fn encode(&self) -> String;
421    }
422
423    /// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
424    ///
425    /// This is a convenience type alias for creating single-entity rate limiters.
426    /// For per-key rate limiting, use [KeyedRateLimiter].
427    pub type RateLimiter<C> = governor::RateLimiter<
428        governor::state::NotKeyed,
429        governor::state::InMemoryState,
430        C,
431        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
432    >;
433
434    /// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
435    ///
436    /// This is a convenience type alias for creating per-peer rate limiters
437    /// using governor's [HashMapStateStore].
438    ///
439    /// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
440    pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
441        K,
442        governor::state::keyed::HashMapStateStore<K>,
443        C,
444        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
445    >;
446
447    /// Interface that any task scheduler must implement to provide
448    /// time-based operations.
449    ///
450    /// It is necessary to mock time to provide deterministic execution
451    /// of arbitrary tasks.
452    pub trait Clock:
453        governor::clock::Clock<Instant = SystemTime>
454        + governor::clock::ReasonablyRealtime
455        + Clone
456        + Send
457        + Sync
458        + 'static
459    {
460        /// Returns the current time.
461        fn current(&self) -> SystemTime;
462
463        /// Sleep for the given duration.
464        fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
465
466        /// Sleep until the given deadline.
467        fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
468
469        /// Await a future with a timeout, returning `Error::Timeout` if it expires.
470        ///
471        /// # Examples
472        ///
473        /// ```
474        /// use std::time::Duration;
475        /// use commonware_runtime::{deterministic, Error, Runner, Clock};
476        ///
477        /// let executor = deterministic::Runner::default();
478        /// executor.start(|context| async move {
479        ///     match context
480        ///         .timeout(Duration::from_millis(100), async { 42 })
481        ///         .await
482        ///     {
483        ///         Ok(value) => assert_eq!(value, 42),
484        ///         Err(Error::Timeout) => panic!("should not timeout"),
485        ///         Err(e) => panic!("unexpected error: {:?}", e),
486        ///     }
487        /// });
488        /// ```
489        fn timeout<F, T>(
490            &self,
491            duration: Duration,
492            future: F,
493        ) -> impl Future<Output = Result<T, Error>> + Send + '_
494        where
495            F: Future<Output = T> + Send + 'static,
496            T: Send + 'static,
497        {
498            async move {
499                select! {
500                    result = future => Ok(result),
501                    _ = self.sleep(duration) => Err(Error::Timeout),
502                }
503            }
504        }
505    }
506
507    /// Syntactic sugar for the type of [Sink] used by a given [Network] N.
508    pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
509
510    /// Syntactic sugar for the type of [Stream] used by a given [Network] N.
511    pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
512
513    /// Syntactic sugar for the type of [Listener] used by a given [Network] N.
514    pub type ListenerOf<N> = <N as crate::Network>::Listener;
515
516    /// Interface that any runtime must implement to create
517    /// network connections.
518    pub trait Network: Clone + Send + Sync + 'static {
519        /// The type of [Listener] that's returned when binding to a socket.
520        /// Accepting a connection returns a [Sink] and [Stream] which are defined
521        /// by the [Listener] and used to send and receive data over the connection.
522        type Listener: Listener;
523
524        /// Bind to the given socket address.
525        fn bind(
526            &self,
527            socket: SocketAddr,
528        ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
529
530        /// Dial the given socket address.
531        fn dial(
532            &self,
533            socket: SocketAddr,
534        ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
535    }
536
537    /// Interface for DNS resolution.
538    pub trait Resolver: Clone + Send + Sync + 'static {
539        /// Resolve a hostname to IP addresses.
540        ///
541        /// Returns a list of IP addresses that the hostname resolves to.
542        fn resolve(
543            &self,
544            host: &str,
545        ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
546    }
547
548    /// Interface that any runtime must implement to handle
549    /// incoming network connections.
550    pub trait Listener: Sync + Send + 'static {
551        /// The type of [Sink] that's returned when accepting a connection.
552        /// This is used to send data to the remote connection.
553        type Sink: Sink;
554        /// The type of [Stream] that's returned when accepting a connection.
555        /// This is used to receive data from the remote connection.
556        type Stream: Stream;
557
558        /// Accept an incoming connection.
559        fn accept(
560            &mut self,
561        ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
562
563        /// Returns the local address of the listener.
564        fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
565    }
566
567    /// Interface that any runtime must implement to send
568    /// messages over a network connection.
569    pub trait Sink: Sync + Send + 'static {
570        /// Send a message to the sink.
571        ///
572        /// # Warning
573        ///
574        /// If the sink returns an error, part of the message may still be delivered.
575        fn send(
576            &mut self,
577            buf: impl Into<IoBufs> + Send,
578        ) -> impl Future<Output = Result<(), Error>> + Send;
579    }
580
581    /// Interface that any runtime must implement to receive
582    /// messages over a network connection.
583    pub trait Stream: Sync + Send + 'static {
584        /// Receive exactly `len` bytes from the stream.
585        ///
586        /// The runtime allocates the buffer and returns it as `IoBufs`.
587        ///
588        /// # Warning
589        ///
590        /// If the stream returns an error, partially read data may be discarded.
591        fn recv(&mut self, len: u64) -> impl Future<Output = Result<IoBufs, Error>> + Send;
592
593        /// Peek at buffered data without consuming.
594        ///
595        /// Returns up to `max_len` bytes from the internal buffer, or an empty slice
596        /// if no data is currently buffered. This does not perform any I/O or block.
597        ///
598        /// This is useful e.g. for parsing length prefixes without committing to a read
599        /// or paying the cost of async.
600        fn peek(&self, max_len: u64) -> &[u8];
601    }
602
603    /// Interface to interact with storage.
604    ///
605    /// To support storage implementations that enable concurrent reads and
606    /// writes, blobs are responsible for maintaining synchronization.
607    ///
608    /// Storage can be backed by a local filesystem, cloud storage, etc.
609    ///
610    /// # Partition Names
611    ///
612    /// Partition names must be non-empty and contain only ASCII alphanumeric
613    /// characters, dashes (`-`), or underscores (`_`). Names containing other
614    /// characters (e.g., `/`, `.`, spaces) will return an error.
615    pub trait Storage: Clone + Send + Sync + 'static {
616        /// The readable/writeable storage buffer that can be opened by this Storage.
617        type Blob: Blob;
618
619        /// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
620        /// in the versions range. The blob version is omitted from the return value.
621        fn open(
622            &self,
623            partition: &str,
624            name: &[u8],
625        ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
626            async move {
627                let (blob, size, _) = self
628                    .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
629                    .await?;
630                Ok((blob, size))
631            }
632        }
633
634        /// Open an existing blob in a given partition or create a new one, returning
635        /// the blob and its length.
636        ///
637        /// Multiple instances of the same blob can be opened concurrently, however,
638        /// writing to the same blob concurrently may lead to undefined behavior.
639        ///
640        /// An Ok result indicates the blob is durably created (or already exists).
641        ///
642        /// # Versions
643        ///
644        /// Blobs are versioned. If the blob's version is not in `versions`, returns
645        /// [Error::BlobVersionMismatch].
646        ///
647        /// # Returns
648        ///
649        /// A tuple of (blob, logical_size, blob_version).
650        fn open_versioned(
651            &self,
652            partition: &str,
653            name: &[u8],
654            versions: std::ops::RangeInclusive<u16>,
655        ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
656
657        /// Remove a blob from a given partition.
658        ///
659        /// If no `name` is provided, the entire partition is removed.
660        ///
661        /// An Ok result indicates the blob is durably removed.
662        fn remove(
663            &self,
664            partition: &str,
665            name: Option<&[u8]>,
666        ) -> impl Future<Output = Result<(), Error>> + Send;
667
668        /// Return all blobs in a given partition.
669        fn scan(&self, partition: &str)
670            -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
671    }
672
673    /// Interface to read and write to a blob.
674    ///
675    /// To support blob implementations that enable concurrent reads and
676    /// writes, blobs are responsible for maintaining synchronization.
677    ///
678    /// Cloning a blob is similar to wrapping a single file descriptor in
679    /// a lock whereas opening a new blob (of the same name) is similar to
680    /// opening a new file descriptor. If multiple blobs are opened with the same
681    /// name, they are not expected to coordinate access to underlying storage
682    /// and writing to both is undefined behavior.
683    ///
684    /// When a blob is dropped, any unsynced changes may be discarded. Implementations
685    /// may attempt to sync during drop but errors will go unhandled. Call `sync`
686    /// before dropping to ensure all changes are durably persisted.
687    #[allow(clippy::len_without_is_empty)]
688    pub trait Blob: Clone + Send + Sync + 'static {
689        /// Read into caller-provided buffer(s) at the given offset.
690        ///
691        /// The caller provides the buffer, and the implementation fills it with data
692        /// read from the blob starting at `offset`. Returns the same buffer, filled
693        /// with data.
694        ///
695        /// # Contract
696        ///
697        /// - The output `IoBufsMut` is the same as the input, with data filled from offset
698        /// - The total bytes read equals the total initialized length of the input buffer(s)
699        fn read_at(
700            &self,
701            offset: u64,
702            buf: impl Into<IoBufsMut> + Send,
703        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
704
705        /// Write `buf` to the blob at the given offset.
706        fn write_at(
707            &self,
708            offset: u64,
709            buf: impl Into<IoBufs> + Send,
710        ) -> impl Future<Output = Result<(), Error>> + Send;
711
712        /// Resize the blob to the given length.
713        ///
714        /// If the length is greater than the current length, the blob is extended with zeros.
715        /// If the length is less than the current length, the blob is resized.
716        fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
717
718        /// Ensure all pending data is durably persisted.
719        fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
720    }
721
722    /// Interface that any runtime must implement to provide buffer pools.
723    pub trait BufferPooler: Clone + Send + Sync + 'static {
724        /// Returns the network [BufferPool].
725        fn network_buffer_pool(&self) -> &BufferPool;
726
727        /// Returns the storage [BufferPool].
728        fn storage_buffer_pool(&self) -> &BufferPool;
729    }
730});
731stability_scope!(ALPHA, cfg(feature = "external") {
732    /// Interface that runtimes can implement to constrain the execution latency of a future.
733    pub trait Pacer: Clock + Clone + Send + Sync + 'static {
734        /// Defer completion of a future until a specified `latency` has elapsed. If the future is
735        /// not yet ready at the desired time of completion, the runtime will block until the future
736        /// is ready.
737        ///
738        /// In [crate::deterministic], this is used to ensure interactions with external systems can
739        /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
740        /// multiple runtimes to be tested with no code changes).
741        ///
742        /// # Setting Latency
743        ///
744        /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
745        /// the expected resolution latency of the future. To better explore the possible behavior of an
746        /// application, users can set latency to a randomly chosen value in the range of
747        /// `[expected latency / 2, expected latency * 2]`.
748        ///
749        /// # Warning
750        ///
751        /// Because `pace` blocks if the future is not ready, it is important that the future's completion
752        /// doesn't require anything in the current thread to complete (or else it will deadlock).
753        fn pace<'a, F, T>(
754            &'a self,
755            latency: Duration,
756            future: F,
757        ) -> impl Future<Output = T> + Send + 'a
758        where
759            F: Future<Output = T> + Send + 'a,
760            T: Send + 'a;
761    }
762
763    /// Extension trait that makes it more ergonomic to use [Pacer].
764    ///
765    /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
766    /// runtime should delay completion relative to the clock.
767    pub trait FutureExt: Future + Send + Sized {
768        /// Delay completion of the future until a specified `latency` on `pacer`.
769        fn pace<'a, E>(
770            self,
771            pacer: &'a E,
772            latency: Duration,
773        ) -> impl Future<Output = Self::Output> + Send + 'a
774        where
775            E: Pacer + 'a,
776            Self: Send + 'a,
777            Self::Output: Send + 'a,
778        {
779            pacer.pace(latency, self)
780        }
781    }
782
783    impl<F> FutureExt for F where F: Future + Send {}
784});
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789    use crate::telemetry::traces::collector::TraceStorage;
790    use bytes::Bytes;
791    use commonware_macros::{select, test_collect_traces};
792    use commonware_utils::{
793        channel::{mpsc, oneshot},
794        NZUsize,
795    };
796    use futures::{
797        future::{pending, ready},
798        join, pin_mut, FutureExt,
799    };
800    use prometheus_client::{
801        encoding::EncodeLabelSet,
802        metrics::{counter::Counter, family::Family},
803    };
804    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
805    use std::{
806        collections::HashMap,
807        net::{IpAddr, Ipv4Addr, Ipv6Addr},
808        pin::Pin,
809        str::FromStr,
810        sync::{
811            atomic::{AtomicU32, Ordering},
812            Arc, Mutex,
813        },
814        task::{Context as TContext, Poll, Waker},
815    };
816    use tracing::{error, Level};
817    use utils::reschedule;
818
819    fn test_error_future<R: Runner>(runner: R) {
820        async fn error_future() -> Result<&'static str, &'static str> {
821            Err("An error occurred")
822        }
823        let result = runner.start(|_| error_future());
824        assert_eq!(result, Err("An error occurred"));
825    }
826
827    fn test_clock_sleep<R: Runner>(runner: R)
828    where
829        R::Context: Spawner + Clock,
830    {
831        runner.start(|context| async move {
832            // Capture initial time
833            let start = context.current();
834            let sleep_duration = Duration::from_millis(10);
835            context.sleep(sleep_duration).await;
836
837            // After run, time should have advanced
838            let end = context.current();
839            assert!(end.duration_since(start).unwrap() >= sleep_duration);
840        });
841    }
842
843    fn test_clock_sleep_until<R: Runner>(runner: R)
844    where
845        R::Context: Spawner + Clock + Metrics,
846    {
847        runner.start(|context| async move {
848            // Trigger sleep
849            let now = context.current();
850            context.sleep_until(now + Duration::from_millis(100)).await;
851
852            // Ensure slept duration has elapsed
853            let elapsed = now.elapsed().unwrap();
854            assert!(elapsed >= Duration::from_millis(100));
855        });
856    }
857
858    fn test_clock_timeout<R: Runner>(runner: R)
859    where
860        R::Context: Spawner + Clock,
861    {
862        runner.start(|context| async move {
863            // Future completes before timeout
864            let result = context
865                .timeout(Duration::from_millis(100), async { "success" })
866                .await;
867            assert_eq!(result.unwrap(), "success");
868
869            // Future exceeds timeout duration
870            let result = context
871                .timeout(Duration::from_millis(50), pending::<()>())
872                .await;
873            assert!(matches!(result, Err(Error::Timeout)));
874
875            // Future completes within timeout
876            let result = context
877                .timeout(
878                    Duration::from_millis(100),
879                    context.sleep(Duration::from_millis(50)),
880                )
881                .await;
882            assert!(result.is_ok());
883        });
884    }
885
886    fn test_root_finishes<R: Runner>(runner: R)
887    where
888        R::Context: Spawner,
889    {
890        runner.start(|context| async move {
891            context.spawn(|_| async move {
892                loop {
893                    reschedule().await;
894                }
895            });
896        });
897    }
898
899    fn test_spawn_after_abort<R>(runner: R)
900    where
901        R: Runner,
902        R::Context: Spawner + Clone,
903    {
904        runner.start(|context| async move {
905            // Create a child context
906            let child = context.clone();
907
908            // Spawn parent and abort
909            let parent_handle = context.spawn(move |_| async move {
910                pending::<()>().await;
911            });
912            parent_handle.abort();
913
914            // Spawn child and ensure it aborts
915            let child_handle = child.spawn(move |_| async move {
916                pending::<()>().await;
917            });
918            assert!(matches!(child_handle.await, Err(Error::Closed)));
919        });
920    }
921
922    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
923    where
924        R::Context: Spawner,
925    {
926        runner.start(|context| async move {
927            let context = if dedicated {
928                assert!(!blocking);
929                context.dedicated()
930            } else {
931                context.shared(blocking)
932            };
933
934            let handle = context.spawn(|_| async move {
935                loop {
936                    reschedule().await;
937                }
938            });
939            handle.abort();
940            assert!(matches!(handle.await, Err(Error::Closed)));
941        });
942    }
943
944    fn test_panic_aborts_root<R: Runner>(runner: R) {
945        let result: Result<(), Error> = runner.start(|_| async move {
946            panic!("blah");
947        });
948        result.unwrap_err();
949    }
950
951    fn test_panic_aborts_spawn<R: Runner>(runner: R)
952    where
953        R::Context: Spawner + Clock,
954    {
955        runner.start(|context| async move {
956            context.clone().spawn(|_| async move {
957                panic!("blah");
958            });
959
960            // Loop until panic
961            loop {
962                context.sleep(Duration::from_millis(100)).await;
963            }
964        });
965    }
966
967    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
968    where
969        R::Context: Spawner + Clock,
970    {
971        let result: Result<(), Error> = runner.start(|context| async move {
972            let result = context.clone().spawn(|_| async move {
973                panic!("blah");
974            });
975            result.await
976        });
977        assert!(matches!(result, Err(Error::Exited)));
978    }
979
980    fn test_multiple_panics<R: Runner>(runner: R)
981    where
982        R::Context: Spawner + Clock,
983    {
984        runner.start(|context| async move {
985            context.clone().spawn(|_| async move {
986                panic!("boom 1");
987            });
988            context.clone().spawn(|_| async move {
989                panic!("boom 2");
990            });
991            context.clone().spawn(|_| async move {
992                panic!("boom 3");
993            });
994
995            // Loop until panic
996            loop {
997                context.sleep(Duration::from_millis(100)).await;
998            }
999        });
1000    }
1001
1002    fn test_multiple_panics_caught<R: Runner>(runner: R)
1003    where
1004        R::Context: Spawner + Clock,
1005    {
1006        let (res1, res2, res3) = runner.start(|context| async move {
1007            let handle1 = context.clone().spawn(|_| async move {
1008                panic!("boom 1");
1009            });
1010            let handle2 = context.clone().spawn(|_| async move {
1011                panic!("boom 2");
1012            });
1013            let handle3 = context.clone().spawn(|_| async move {
1014                panic!("boom 3");
1015            });
1016
1017            join!(handle1, handle2, handle3)
1018        });
1019        assert!(matches!(res1, Err(Error::Exited)));
1020        assert!(matches!(res2, Err(Error::Exited)));
1021        assert!(matches!(res3, Err(Error::Exited)));
1022    }
1023
1024    fn test_select<R: Runner>(runner: R) {
1025        runner.start(|_| async move {
1026            // Test first branch
1027            let output = Mutex::new(0);
1028            select! {
1029                v1 = ready(1) => {
1030                    *output.lock().unwrap() = v1;
1031                },
1032                v2 = ready(2) => {
1033                    *output.lock().unwrap() = v2;
1034                },
1035            };
1036            assert_eq!(*output.lock().unwrap(), 1);
1037
1038            // Test second branch
1039            select! {
1040                v1 = std::future::pending::<i32>() => {
1041                    *output.lock().unwrap() = v1;
1042                },
1043                v2 = ready(2) => {
1044                    *output.lock().unwrap() = v2;
1045                },
1046            };
1047            assert_eq!(*output.lock().unwrap(), 2);
1048        });
1049    }
1050
1051    /// Ensure future fusing works as expected.
1052    fn test_select_loop<R: Runner>(runner: R)
1053    where
1054        R::Context: Clock,
1055    {
1056        runner.start(|context| async move {
1057            // Should hit timeout
1058            let (sender, mut receiver) = mpsc::unbounded_channel();
1059            for _ in 0..2 {
1060                select! {
1061                    v = receiver.recv() => {
1062                        panic!("unexpected value: {v:?}");
1063                    },
1064                    _ = context.sleep(Duration::from_millis(100)) => {
1065                        continue;
1066                    },
1067                };
1068            }
1069
1070            // Populate channel
1071            sender.send(0).unwrap();
1072            sender.send(1).unwrap();
1073
1074            // Prefer not reading channel without losing messages
1075            select! {
1076                _ = async {} => {
1077                    // Skip reading from channel even though populated
1078                },
1079                v = receiver.recv() => {
1080                    panic!("unexpected value: {v:?}");
1081                },
1082            };
1083
1084            // Process messages
1085            for i in 0..2 {
1086                select! {
1087                    _ = context.sleep(Duration::from_millis(100)) => {
1088                        panic!("timeout");
1089                    },
1090                    v = receiver.recv() => {
1091                        assert_eq!(v.unwrap(), i);
1092                    },
1093                };
1094            }
1095        });
1096    }
1097
1098    fn test_storage_operations<R: Runner>(runner: R)
1099    where
1100        R::Context: Storage,
1101    {
1102        runner.start(|context| async move {
1103            let partition = "test_partition";
1104            let name = b"test_blob";
1105
1106            // Open a new blob
1107            let (blob, size) = context
1108                .open(partition, name)
1109                .await
1110                .expect("Failed to open blob");
1111            assert_eq!(size, 0, "new blob should have size 0");
1112
1113            // Write data to the blob
1114            let data = b"Hello, Storage!";
1115            blob.write_at(0, data)
1116                .await
1117                .expect("Failed to write to blob");
1118
1119            // Sync the blob
1120            blob.sync().await.expect("Failed to sync blob");
1121
1122            // Read data from the blob
1123            let read = blob
1124                .read_at(0, IoBufMut::zeroed(data.len()))
1125                .await
1126                .expect("Failed to read from blob");
1127            assert_eq!(read.coalesce(), data);
1128
1129            // Sync the blob
1130            blob.sync().await.expect("Failed to sync blob");
1131
1132            // Scan blobs in the partition
1133            let blobs = context
1134                .scan(partition)
1135                .await
1136                .expect("Failed to scan partition");
1137            assert!(blobs.contains(&name.to_vec()));
1138
1139            // Reopen the blob
1140            let (blob, len) = context
1141                .open(partition, name)
1142                .await
1143                .expect("Failed to reopen blob");
1144            assert_eq!(len, data.len() as u64);
1145
1146            // Read data part of message back
1147            let read = blob
1148                .read_at(7, IoBufMut::zeroed(7))
1149                .await
1150                .expect("Failed to read data");
1151            assert_eq!(read.coalesce(), b"Storage");
1152
1153            // Sync the blob
1154            blob.sync().await.expect("Failed to sync blob");
1155
1156            // Remove the blob
1157            context
1158                .remove(partition, Some(name))
1159                .await
1160                .expect("Failed to remove blob");
1161
1162            // Ensure the blob is removed
1163            let blobs = context
1164                .scan(partition)
1165                .await
1166                .expect("Failed to scan partition");
1167            assert!(!blobs.contains(&name.to_vec()));
1168
1169            // Remove the partition
1170            context
1171                .remove(partition, None)
1172                .await
1173                .expect("Failed to remove partition");
1174
1175            // Scan the partition
1176            let result = context.scan(partition).await;
1177            assert!(matches!(result, Err(Error::PartitionMissing(_))));
1178        });
1179    }
1180
1181    fn test_blob_read_write<R: Runner>(runner: R)
1182    where
1183        R::Context: Storage,
1184    {
1185        runner.start(|context| async move {
1186            let partition = "test_partition";
1187            let name = b"test_blob_rw";
1188
1189            // Open a new blob
1190            let (blob, _) = context
1191                .open(partition, name)
1192                .await
1193                .expect("Failed to open blob");
1194
1195            // Write data at different offsets
1196            let data1 = b"Hello";
1197            let data2 = b"World";
1198            blob.write_at(0, data1)
1199                .await
1200                .expect("Failed to write data1");
1201            blob.write_at(5, data2)
1202                .await
1203                .expect("Failed to write data2");
1204
1205            // Read data back
1206            let read = blob
1207                .read_at(0, IoBufMut::zeroed(10))
1208                .await
1209                .expect("Failed to read data");
1210            let read = read.coalesce();
1211            assert_eq!(&read.as_ref()[..5], data1);
1212            assert_eq!(&read.as_ref()[5..], data2);
1213
1214            // Read past end of blob
1215            let result = blob.read_at(10, IoBufMut::zeroed(10)).await;
1216            assert!(result.is_err());
1217
1218            // Rewrite data without affecting length
1219            let data3 = b"Store";
1220            blob.write_at(5, data3)
1221                .await
1222                .expect("Failed to write data3");
1223
1224            // Read data back
1225            let read = blob
1226                .read_at(0, IoBufMut::zeroed(10))
1227                .await
1228                .expect("Failed to read data");
1229            let read = read.coalesce();
1230            assert_eq!(&read.as_ref()[..5], data1);
1231            assert_eq!(&read.as_ref()[5..], data3);
1232
1233            // Read past end of blob
1234            let result = blob.read_at(10, IoBufMut::zeroed(10)).await;
1235            assert!(result.is_err());
1236        });
1237    }
1238
1239    fn test_blob_resize<R: Runner>(runner: R)
1240    where
1241        R::Context: Storage,
1242    {
1243        runner.start(|context| async move {
1244            let partition = "test_partition_resize";
1245            let name = b"test_blob_resize";
1246
1247            // Open and write to a new blob
1248            let (blob, _) = context
1249                .open(partition, name)
1250                .await
1251                .expect("Failed to open blob");
1252
1253            let data = b"some data";
1254            blob.write_at(0, data.to_vec())
1255                .await
1256                .expect("Failed to write");
1257            blob.sync().await.expect("Failed to sync after write");
1258
1259            // Re-open and check length
1260            let (blob, len) = context.open(partition, name).await.unwrap();
1261            assert_eq!(len, data.len() as u64);
1262
1263            // Resize to extend the file
1264            let new_len = (data.len() as u64) * 2;
1265            blob.resize(new_len)
1266                .await
1267                .expect("Failed to resize to extend");
1268            blob.sync().await.expect("Failed to sync after resize");
1269
1270            // Re-open and check length again
1271            let (blob, len) = context.open(partition, name).await.unwrap();
1272            assert_eq!(len, new_len);
1273
1274            // Read original data
1275            let read_buf = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1276            assert_eq!(read_buf.coalesce(), data);
1277
1278            // Read extended part (should be zeros)
1279            let extended_part = blob
1280                .read_at(data.len() as u64, IoBufMut::zeroed(data.len()))
1281                .await
1282                .unwrap();
1283            assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1284
1285            // Truncate the blob
1286            blob.resize(data.len() as u64).await.unwrap();
1287            blob.sync().await.unwrap();
1288
1289            // Reopen to check truncation
1290            let (blob, size) = context.open(partition, name).await.unwrap();
1291            assert_eq!(size, data.len() as u64);
1292
1293            // Read truncated data
1294            let read_buf = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1295            assert_eq!(read_buf.coalesce(), data);
1296            blob.sync().await.unwrap();
1297        });
1298    }
1299
1300    fn test_many_partition_read_write<R: Runner>(runner: R)
1301    where
1302        R::Context: Storage,
1303    {
1304        runner.start(|context| async move {
1305            let partitions = ["partition1", "partition2", "partition3"];
1306            let name = b"test_blob_rw";
1307            let data1 = b"Hello";
1308            let data2 = b"World";
1309
1310            for (additional, partition) in partitions.iter().enumerate() {
1311                // Open a new blob
1312                let (blob, _) = context
1313                    .open(partition, name)
1314                    .await
1315                    .expect("Failed to open blob");
1316
1317                // Write data at different offsets
1318                blob.write_at(0, data1)
1319                    .await
1320                    .expect("Failed to write data1");
1321                blob.write_at(5 + additional as u64, data2)
1322                    .await
1323                    .expect("Failed to write data2");
1324
1325                // Sync the blob
1326                blob.sync().await.expect("Failed to sync blob");
1327            }
1328
1329            for (additional, partition) in partitions.iter().enumerate() {
1330                // Open a new blob
1331                let (blob, len) = context
1332                    .open(partition, name)
1333                    .await
1334                    .expect("Failed to open blob");
1335                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1336
1337                // Read data back
1338                let read = blob
1339                    .read_at(0, IoBufMut::zeroed(10 + additional))
1340                    .await
1341                    .expect("Failed to read data");
1342                let read = read.coalesce();
1343                assert_eq!(&read.as_ref()[..5], b"Hello");
1344                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1345            }
1346        });
1347    }
1348
1349    fn test_blob_read_past_length<R: Runner>(runner: R)
1350    where
1351        R::Context: Storage,
1352    {
1353        runner.start(|context| async move {
1354            let partition = "test_partition";
1355            let name = b"test_blob_rw";
1356
1357            // Open a new blob
1358            let (blob, _) = context
1359                .open(partition, name)
1360                .await
1361                .expect("Failed to open blob");
1362
1363            // Read data past file length (empty file)
1364            let result = blob.read_at(0, IoBufMut::zeroed(10)).await;
1365            assert!(result.is_err());
1366
1367            // Write data to the blob
1368            let data = b"Hello, Storage!".to_vec();
1369            blob.write_at(0, data)
1370                .await
1371                .expect("Failed to write to blob");
1372
1373            // Read data past file length (non-empty file)
1374            let result = blob.read_at(0, IoBufMut::zeroed(20)).await;
1375            assert!(result.is_err());
1376        })
1377    }
1378
1379    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1380    where
1381        R::Context: Spawner + Storage + Metrics,
1382    {
1383        runner.start(|context| async move {
1384            let partition = "test_partition";
1385            let name = b"test_blob_rw";
1386
1387            // Open a new blob
1388            let (blob, _) = context
1389                .open(partition, name)
1390                .await
1391                .expect("Failed to open blob");
1392
1393            // Write data to the blob
1394            let data = b"Hello, Storage!";
1395            blob.write_at(0, data)
1396                .await
1397                .expect("Failed to write to blob");
1398
1399            // Sync the blob
1400            blob.sync().await.expect("Failed to sync blob");
1401
1402            // Read data from the blob in clone
1403            let check1 = context.with_label("check1").spawn({
1404                let blob = blob.clone();
1405                let data_len = data.len();
1406                move |_| async move {
1407                    let read = blob
1408                        .read_at(0, IoBufMut::zeroed(data_len))
1409                        .await
1410                        .expect("Failed to read from blob");
1411                    assert_eq!(read.coalesce(), data);
1412                }
1413            });
1414            let check2 = context.with_label("check2").spawn({
1415                let blob = blob.clone();
1416                let data_len = data.len();
1417                move |_| async move {
1418                    let read = blob
1419                        .read_at(0, IoBufMut::zeroed(data_len))
1420                        .await
1421                        .expect("Failed to read from blob");
1422                    assert_eq!(read.coalesce(), data);
1423                }
1424            });
1425
1426            // Wait for both reads to complete
1427            let result = join!(check1, check2);
1428            assert!(result.0.is_ok());
1429            assert!(result.1.is_ok());
1430
1431            // Read data from the blob
1432            let read = blob
1433                .read_at(0, IoBufMut::zeroed(data.len()))
1434                .await
1435                .expect("Failed to read from blob");
1436            assert_eq!(read.coalesce(), data);
1437
1438            // Drop the blob
1439            drop(blob);
1440
1441            // Ensure no blobs still open
1442            let buffer = context.encode();
1443            assert!(buffer.contains("open_blobs 0"));
1444        });
1445    }
1446
1447    fn test_shutdown<R: Runner>(runner: R)
1448    where
1449        R::Context: Spawner + Metrics + Clock,
1450    {
1451        let kill = 9;
1452        runner.start(|context| async move {
1453            // Spawn a task that waits for signal
1454            let before = context
1455                .with_label("before")
1456                .spawn(move |context| async move {
1457                    let mut signal = context.stopped();
1458                    let value = (&mut signal).await.unwrap();
1459                    assert_eq!(value, kill);
1460                    drop(signal);
1461                });
1462
1463            // Signal the tasks and wait for them to stop
1464            let result = context.clone().stop(kill, None).await;
1465            assert!(result.is_ok());
1466
1467            // Spawn a task after stop is called
1468            let after = context
1469                .with_label("after")
1470                .spawn(move |context| async move {
1471                    // A call to `stopped()` after `stop()` resolves immediately
1472                    let value = context.stopped().await.unwrap();
1473                    assert_eq!(value, kill);
1474                });
1475
1476            // Ensure both tasks complete
1477            let result = join!(before, after);
1478            assert!(result.0.is_ok());
1479            assert!(result.1.is_ok());
1480        });
1481    }
1482
1483    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1484    where
1485        R::Context: Spawner + Metrics + Clock,
1486    {
1487        let kill = 42;
1488        runner.start(|context| async move {
1489            let (started_tx, mut started_rx) = mpsc::channel(3);
1490            let counter = Arc::new(AtomicU32::new(0));
1491
1492            // Spawn 3 tasks that do cleanup work after receiving stop signal
1493            // and increment a shared counter
1494            let task = |cleanup_duration: Duration| {
1495                let context = context.clone();
1496                let counter = counter.clone();
1497                let started_tx = started_tx.clone();
1498                context.spawn(move |context| async move {
1499                    // Wait for signal to be acquired
1500                    let mut signal = context.stopped();
1501                    started_tx.send(()).await.unwrap();
1502
1503                    // Increment once killed
1504                    let value = (&mut signal).await.unwrap();
1505                    assert_eq!(value, kill);
1506                    context.sleep(cleanup_duration).await;
1507                    counter.fetch_add(1, Ordering::SeqCst);
1508
1509                    // Wait to drop signal until work has been done
1510                    drop(signal);
1511                })
1512            };
1513
1514            let task1 = task(Duration::from_millis(10));
1515            let task2 = task(Duration::from_millis(20));
1516            let task3 = task(Duration::from_millis(30));
1517
1518            // Give tasks time to start
1519            for _ in 0..3 {
1520                started_rx.recv().await.unwrap();
1521            }
1522
1523            // Stop and verify all cleanup completed
1524            context.stop(kill, None).await.unwrap();
1525            assert_eq!(counter.load(Ordering::SeqCst), 3);
1526
1527            // Ensure all tasks completed
1528            let result = join!(task1, task2, task3);
1529            assert!(result.0.is_ok());
1530            assert!(result.1.is_ok());
1531            assert!(result.2.is_ok());
1532        });
1533    }
1534
1535    fn test_shutdown_timeout<R: Runner>(runner: R)
1536    where
1537        R::Context: Spawner + Metrics + Clock,
1538    {
1539        let kill = 42;
1540        runner.start(|context| async move {
1541            // Setup startup coordinator
1542            let (started_tx, started_rx) = oneshot::channel();
1543
1544            // Spawn a task that never completes its cleanup
1545            context.clone().spawn(move |context| async move {
1546                let signal = context.stopped();
1547                started_tx.send(()).unwrap();
1548                pending::<()>().await;
1549                signal.await.unwrap();
1550            });
1551
1552            // Try to stop with a timeout
1553            started_rx.await.unwrap();
1554            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1555
1556            // Assert that we got a timeout error
1557            assert!(matches!(result, Err(Error::Timeout)));
1558        });
1559    }
1560
1561    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1562    where
1563        R::Context: Spawner + Metrics + Clock,
1564    {
1565        let kill1 = 42;
1566        let kill2 = 43;
1567
1568        runner.start(|context| async move {
1569            let (started_tx, started_rx) = oneshot::channel();
1570            let counter = Arc::new(AtomicU32::new(0));
1571
1572            // Spawn a task that delays completion to test timing
1573            let task = context.with_label("blocking_task").spawn({
1574                let counter = counter.clone();
1575                move |context| async move {
1576                    // Wait for signal to be acquired
1577                    let mut signal = context.stopped();
1578                    started_tx.send(()).unwrap();
1579
1580                    // Wait for signal to be resolved
1581                    let value = (&mut signal).await.unwrap();
1582                    assert_eq!(value, kill1);
1583                    context.sleep(Duration::from_millis(50)).await;
1584
1585                    // Increment counter
1586                    counter.fetch_add(1, Ordering::SeqCst);
1587                    drop(signal);
1588                }
1589            });
1590
1591            // Give task time to start
1592            started_rx.await.unwrap();
1593
1594            // Issue two separate stop calls
1595            // The second stop call uses a different stop value that should be ignored
1596            let stop_task1 = context.clone().stop(kill1, None);
1597            pin_mut!(stop_task1);
1598            let stop_task2 = context.clone().stop(kill2, None);
1599            pin_mut!(stop_task2);
1600
1601            // Both of them should be awaiting completion
1602            assert!(stop_task1.as_mut().now_or_never().is_none());
1603            assert!(stop_task2.as_mut().now_or_never().is_none());
1604
1605            // Wait for both stop calls to complete
1606            assert!(stop_task1.await.is_ok());
1607            assert!(stop_task2.await.is_ok());
1608
1609            // Verify first stop value wins
1610            let sig = context.stopped().await;
1611            assert_eq!(sig.unwrap(), kill1);
1612
1613            // Wait for blocking task to complete
1614            let result = task.await;
1615            assert!(result.is_ok());
1616            assert_eq!(counter.load(Ordering::SeqCst), 1);
1617
1618            // Post-completion stop should return immediately
1619            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1620        });
1621    }
1622
1623    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1624    where
1625        R::Context: Spawner + Metrics,
1626    {
1627        runner.start(|context| async move {
1628            // Spawn a task that waits for signal
1629            context
1630                .with_label("before")
1631                .spawn(move |context| async move {
1632                    let mut signal = context.stopped();
1633                    let value = (&mut signal).await.unwrap();
1634
1635                    // We should never reach this point
1636                    assert_eq!(value, 42);
1637                    drop(signal);
1638                });
1639
1640            // Ensure waker is registered
1641            reschedule().await;
1642        });
1643    }
1644
1645    fn test_spawn_dedicated<R: Runner>(runner: R)
1646    where
1647        R::Context: Spawner,
1648    {
1649        runner.start(|context| async move {
1650            let handle = context.dedicated().spawn(|_| async move { 42 });
1651            assert!(matches!(handle.await, Ok(42)));
1652        });
1653    }
1654
1655    fn test_spawn<R: Runner>(runner: R)
1656    where
1657        R::Context: Spawner + Clock,
1658    {
1659        runner.start(|context| async move {
1660            let child_handle = Arc::new(Mutex::new(None));
1661            let child_handle2 = child_handle.clone();
1662
1663            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1664            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1665            let parent_handle = context.spawn(move |context| async move {
1666                // Spawn child that completes immediately
1667                let handle = context.spawn(|_| async {});
1668
1669                // Store child handle so we can test it later
1670                *child_handle2.lock().unwrap() = Some(handle);
1671
1672                parent_initialized_tx.send(()).unwrap();
1673
1674                // Parent task completes
1675                parent_complete_rx.await.unwrap();
1676            });
1677
1678            // Wait for parent task to spawn the children
1679            parent_initialized_rx.await.unwrap();
1680
1681            // Child task completes successfully
1682            let child_handle = child_handle.lock().unwrap().take().unwrap();
1683            assert!(child_handle.await.is_ok());
1684
1685            // Complete the parent task
1686            parent_complete_tx.send(()).unwrap();
1687
1688            // Wait for parent task to complete successfully
1689            assert!(parent_handle.await.is_ok());
1690        });
1691    }
1692
1693    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1694    where
1695        R::Context: Spawner + Clock,
1696    {
1697        runner.start(|context| async move {
1698            let child_handle = Arc::new(Mutex::new(None));
1699            let child_handle2 = child_handle.clone();
1700
1701            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1702            let parent_handle = context.spawn(move |context| async move {
1703                // Spawn child task that hangs forever, should be aborted when parent aborts
1704                let handle = context.spawn(|_| pending::<()>());
1705
1706                // Store child task handle so we can test it later
1707                *child_handle2.lock().unwrap() = Some(handle);
1708
1709                parent_initialized_tx.send(()).unwrap();
1710
1711                // Parent task runs until aborted
1712                pending::<()>().await
1713            });
1714
1715            // Wait for parent task to spawn the children
1716            parent_initialized_rx.await.unwrap();
1717
1718            // Abort parent task
1719            parent_handle.abort();
1720            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1721
1722            // Child task should also resolve with error since its parent aborted
1723            let child_handle = child_handle.lock().unwrap().take().unwrap();
1724            assert!(matches!(child_handle.await, Err(Error::Closed)));
1725        });
1726    }
1727
1728    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1729    where
1730        R::Context: Spawner + Clock,
1731    {
1732        runner.start(|context| async move {
1733            let child_handle = Arc::new(Mutex::new(None));
1734            let child_handle2 = child_handle.clone();
1735
1736            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1737            let parent_handle = context.spawn(move |context| async move {
1738                // Spawn child task that hangs forever, should be aborted when parent completes
1739                let handle = context.spawn(|_| pending::<()>());
1740
1741                // Store child task handle so we can test it later
1742                *child_handle2.lock().unwrap() = Some(handle);
1743
1744                // Parent task completes
1745                parent_complete_rx.await.unwrap();
1746            });
1747
1748            // Fire parent completion
1749            parent_complete_tx.send(()).unwrap();
1750
1751            // Wait for parent task to complete
1752            assert!(parent_handle.await.is_ok());
1753
1754            // Child task should resolve with error since its parent has completed
1755            let child_handle = child_handle.lock().unwrap().take().unwrap();
1756            assert!(matches!(child_handle.await, Err(Error::Closed)));
1757        });
1758    }
1759
1760    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1761    where
1762        R::Context: Spawner + Clock,
1763    {
1764        runner.start(|context| async move {
1765            // We create the following tree of tasks. All tasks will run
1766            // indefinitely (until aborted).
1767            //
1768            //          root
1769            //     /     |     \
1770            //    /      |      \
1771            //   c0      c1      c2
1772            //  /  \    /  \    /  \
1773            // g0  g1  g2  g3  g4  g5
1774            let c0 = context.clone();
1775            let g0 = c0.clone();
1776            let g1 = c0.clone();
1777            let c1 = context.clone();
1778            let g2 = c1.clone();
1779            let g3 = c1.clone();
1780            let c2 = context.clone();
1781            let g4 = c2.clone();
1782            let g5 = c2.clone();
1783
1784            // Spawn tasks
1785            let handles = Arc::new(Mutex::new(Vec::new()));
1786            let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1787            let root_task = context.spawn({
1788                let handles = handles.clone();
1789                move |_| async move {
1790                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1791                    {
1792                        let handle = context.spawn({
1793                            let handles = handles.clone();
1794                            let initialized_tx = initialized_tx.clone();
1795                            move |_| async move {
1796                                for grandchild in grandchildren {
1797                                    let handle = grandchild.spawn(|_| async {
1798                                        pending::<()>().await;
1799                                    });
1800                                    handles.lock().unwrap().push(handle);
1801                                    initialized_tx.send(()).await.unwrap();
1802                                }
1803
1804                                pending::<()>().await;
1805                            }
1806                        });
1807                        handles.lock().unwrap().push(handle);
1808                        initialized_tx.send(()).await.unwrap();
1809                    }
1810
1811                    pending::<()>().await;
1812                }
1813            });
1814
1815            // Wait for tasks to initialize
1816            for _ in 0..9 {
1817                initialized_rx.recv().await.unwrap();
1818            }
1819
1820            // Verify we have all 9 handles (3 children + 6 grandchildren)
1821            assert_eq!(handles.lock().unwrap().len(), 9);
1822
1823            // Abort root task
1824            root_task.abort();
1825            assert!(matches!(root_task.await, Err(Error::Closed)));
1826
1827            // All handles should resolve with error due to cascading abort
1828            let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1829            for handle in handles {
1830                assert!(matches!(handle.await, Err(Error::Closed)));
1831            }
1832        });
1833    }
1834
1835    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1836    where
1837        R::Context: Spawner + Clock,
1838    {
1839        runner.start(|context| async move {
1840            let (child_started_tx, child_started_rx) = oneshot::channel();
1841            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1842            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1843            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1844            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1845            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1846            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1847
1848            let parent = context.spawn(move |context| async move {
1849                // Spawn a child task
1850                let child_handle = context.clone().spawn(|_| async move {
1851                    child_started_tx.send(()).unwrap();
1852                    // Wait for signal to complete
1853                    child_complete_rx.await.unwrap();
1854                });
1855                assert!(
1856                    child_handle_tx.send(child_handle).is_ok(),
1857                    "child handle receiver dropped"
1858                );
1859
1860                // Spawn an independent sibling task
1861                let sibling_handle = context.clone().spawn(move |_| async move {
1862                    sibling_started_tx.send(()).unwrap();
1863                    // Wait for signal to complete
1864                    sibling_complete_rx.await.unwrap();
1865                });
1866                assert!(
1867                    sibling_handle_tx.send(sibling_handle).is_ok(),
1868                    "sibling handle receiver dropped"
1869                );
1870
1871                // Wait for signal to complete
1872                parent_complete_rx.await.unwrap();
1873            });
1874
1875            // Wait for both to start
1876            child_started_rx.await.unwrap();
1877            sibling_started_rx.await.unwrap();
1878
1879            // Kill the sibling
1880            sibling_complete_tx.send(()).unwrap();
1881            assert!(sibling_handle_rx.await.is_ok());
1882
1883            // The child task should still be alive
1884            child_complete_tx.send(()).unwrap();
1885            assert!(child_handle_rx.await.is_ok());
1886
1887            // As well as the parent
1888            parent_complete_tx.send(()).unwrap();
1889            assert!(parent.await.is_ok());
1890        });
1891    }
1892
1893    fn test_spawn_clone_chain<R: Runner>(runner: R)
1894    where
1895        R::Context: Spawner + Clock,
1896    {
1897        runner.start(|context| async move {
1898            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1899            let (child_started_tx, child_started_rx) = oneshot::channel();
1900            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1901            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1902            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1903
1904            let parent = context.clone().spawn({
1905                move |context| async move {
1906                    let child = context.clone().spawn({
1907                        move |context| async move {
1908                            let grandchild = context.clone().spawn({
1909                                move |_| async move {
1910                                    grandchild_started_tx.send(()).unwrap();
1911                                    pending::<()>().await;
1912                                }
1913                            });
1914                            assert!(
1915                                grandchild_handle_tx.send(grandchild).is_ok(),
1916                                "grandchild handle receiver dropped"
1917                            );
1918                            child_started_tx.send(()).unwrap();
1919                            pending::<()>().await;
1920                        }
1921                    });
1922                    assert!(
1923                        child_handle_tx.send(child).is_ok(),
1924                        "child handle receiver dropped"
1925                    );
1926                    parent_started_tx.send(()).unwrap();
1927                    pending::<()>().await;
1928                }
1929            });
1930
1931            parent_started_rx.await.unwrap();
1932            child_started_rx.await.unwrap();
1933            grandchild_started_rx.await.unwrap();
1934
1935            let child_handle = child_handle_rx.await.unwrap();
1936            let grandchild_handle = grandchild_handle_rx.await.unwrap();
1937
1938            parent.abort();
1939            assert!(parent.await.is_err());
1940
1941            assert!(child_handle.await.is_err());
1942            assert!(grandchild_handle.await.is_err());
1943        });
1944    }
1945
1946    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1947    where
1948        R::Context: Spawner + Clock,
1949    {
1950        runner.start(|context| async move {
1951            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1952            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1953
1954            let parent = context.clone().spawn({
1955                move |context| async move {
1956                    let clone1 = context.clone();
1957                    let clone2 = clone1.clone();
1958                    let clone3 = clone2.clone();
1959
1960                    let leaf = clone3.clone().spawn({
1961                        move |_| async move {
1962                            leaf_started_tx.send(()).unwrap();
1963                            pending::<()>().await;
1964                        }
1965                    });
1966
1967                    leaf_handle_tx
1968                        .send(leaf)
1969                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1970                    pending::<()>().await;
1971                }
1972            });
1973
1974            leaf_started_rx.await.unwrap();
1975            let leaf_handle = leaf_handle_rx.await.unwrap();
1976
1977            parent.abort();
1978            assert!(parent.await.is_err());
1979            assert!(leaf_handle.await.is_err());
1980        });
1981    }
1982
1983    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1984    where
1985        R::Context: Spawner,
1986    {
1987        runner.start(|context| async move {
1988            let context = if dedicated {
1989                context.dedicated()
1990            } else {
1991                context.shared(true)
1992            };
1993
1994            let handle = context.spawn(|_| async move { 42 });
1995            let result = handle.await;
1996            assert!(matches!(result, Ok(42)));
1997        });
1998    }
1999
2000    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2001    where
2002        R::Context: Spawner + Clock,
2003    {
2004        runner.start(|context| async move {
2005            let context = if dedicated {
2006                context.dedicated()
2007            } else {
2008                context.shared(true)
2009            };
2010
2011            context.clone().spawn(|_| async move {
2012                panic!("blocking task panicked");
2013            });
2014
2015            // Loop until panic
2016            loop {
2017                context.sleep(Duration::from_millis(100)).await;
2018            }
2019        });
2020    }
2021
2022    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2023    where
2024        R::Context: Spawner + Clock,
2025    {
2026        let result: Result<(), Error> = runner.start(|context| async move {
2027            let context = if dedicated {
2028                context.dedicated()
2029            } else {
2030                context.shared(true)
2031            };
2032
2033            let handle = context.clone().spawn(|_| async move {
2034                panic!("blocking task panicked");
2035            });
2036            handle.await
2037        });
2038        assert!(matches!(result, Err(Error::Exited)));
2039    }
2040
2041    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2042        runner.start(|_| async move {
2043            // Setup tracked resource
2044            let dropper = Arc::new(());
2045            let executor = deterministic::Runner::default();
2046            executor.start({
2047                let dropper = dropper.clone();
2048                move |context| async move {
2049                    // Create tasks with circular dependencies through channels
2050                    let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2051                    let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2052                    let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2053
2054                    // Task 1 holds tx2 and waits on rx1
2055                    context.with_label("task1").spawn({
2056                        let setup_tx = setup_tx.clone();
2057                        let dropper = dropper.clone();
2058                        move |_| async move {
2059                            // Setup deadlock and mark ready
2060                            tx2.send(()).unwrap();
2061                            rx1.recv().await.unwrap();
2062                            setup_tx.send(()).unwrap();
2063
2064                            // Wait forever
2065                            while rx1.recv().await.is_some() {}
2066                            drop(tx2);
2067                            drop(dropper);
2068                        }
2069                    });
2070
2071                    // Task 2 holds tx1 and waits on rx2
2072                    context.with_label("task2").spawn(move |_| async move {
2073                        // Setup deadlock and mark ready
2074                        tx1.send(()).unwrap();
2075                        rx2.recv().await.unwrap();
2076                        setup_tx.send(()).unwrap();
2077
2078                        // Wait forever
2079                        while rx2.recv().await.is_some() {}
2080                        drop(tx1);
2081                        drop(dropper);
2082                    });
2083
2084                    // Wait for tasks to start
2085                    setup_rx.recv().await.unwrap();
2086                    setup_rx.recv().await.unwrap();
2087                }
2088            });
2089
2090            // After runtime drop, both tasks should be cleaned up
2091            Arc::try_unwrap(dropper).expect("references remaining");
2092        });
2093    }
2094
2095    fn test_late_waker<R: Runner>(runner: R)
2096    where
2097        R::Context: Metrics + Spawner,
2098    {
2099        // A future that captures its waker and sends it to the caller, then
2100        // stays pending forever.
2101        struct CaptureWaker {
2102            tx: Option<oneshot::Sender<Waker>>,
2103            sent: bool,
2104        }
2105        impl Future for CaptureWaker {
2106            type Output = ();
2107            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2108                if !self.sent {
2109                    if let Some(tx) = self.tx.take() {
2110                        // Send a clone of the current task's waker to the root
2111                        let _ = tx.send(cx.waker().clone());
2112                    }
2113                    self.sent = true;
2114                }
2115                Poll::Pending
2116            }
2117        }
2118
2119        // A guard that wakes the captured waker on drop.
2120        struct WakeOnDrop(Option<Waker>);
2121        impl Drop for WakeOnDrop {
2122            fn drop(&mut self) {
2123                if let Some(w) = self.0.take() {
2124                    w.wake_by_ref();
2125                }
2126            }
2127        }
2128
2129        // Run the executor to completion
2130        let holder = runner.start(|context| async move {
2131            // Wire a oneshot to receive the task waker.
2132            let (tx, rx) = oneshot::channel::<Waker>();
2133
2134            // Spawn a task that registers its waker and then stays pending.
2135            context
2136                .with_label("capture_waker")
2137                .spawn(move |_| async move {
2138                    CaptureWaker {
2139                        tx: Some(tx),
2140                        sent: false,
2141                    }
2142                    .await;
2143                });
2144
2145            // Ensure the spawned task runs and registers its waker.
2146            utils::reschedule().await;
2147
2148            // Receive the waker from the spawned task.
2149            let waker = rx.await.expect("waker not received");
2150
2151            // Return a guard that will wake after the runtime has dropped.
2152            WakeOnDrop(Some(waker))
2153        });
2154
2155        // Dropping the guard after the runtime has torn down will trigger a wake on
2156        // a task whose executor has been dropped.
2157        drop(holder);
2158    }
2159
2160    fn test_metrics<R: Runner>(runner: R)
2161    where
2162        R::Context: Metrics,
2163    {
2164        runner.start(|context| async move {
2165            // Assert label
2166            assert_eq!(context.label(), "");
2167
2168            // Register a metric
2169            let counter = Counter::<u64>::default();
2170            context.register("test", "test", counter.clone());
2171
2172            // Increment the counter
2173            counter.inc();
2174
2175            // Encode metrics
2176            let buffer = context.encode();
2177            assert!(buffer.contains("test_total 1"));
2178
2179            // Nested context
2180            let context = context.with_label("nested");
2181            let nested_counter = Counter::<u64>::default();
2182            context.register("test", "test", nested_counter.clone());
2183
2184            // Increment the counter
2185            nested_counter.inc();
2186
2187            // Encode metrics
2188            let buffer = context.encode();
2189            assert!(buffer.contains("nested_test_total 1"));
2190            assert!(buffer.contains("test_total 1"));
2191        });
2192    }
2193
2194    fn test_metrics_with_attribute<R: Runner>(runner: R)
2195    where
2196        R::Context: Metrics,
2197    {
2198        runner.start(|context| async move {
2199            // Create context with a attribute
2200            let ctx_epoch5 = context
2201                .with_label("consensus")
2202                .with_attribute("epoch", "e5");
2203
2204            // Register a metric with the attribute
2205            let counter = Counter::<u64>::default();
2206            ctx_epoch5.register("votes", "vote count", counter.clone());
2207            counter.inc();
2208
2209            // Encode and verify the attribute appears as a label
2210            let buffer = context.encode();
2211            assert!(
2212                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2213                "Expected metric with epoch attribute, got: {}",
2214                buffer
2215            );
2216
2217            // Create context with different epoch attribute (same metric name)
2218            let ctx_epoch6 = context
2219                .with_label("consensus")
2220                .with_attribute("epoch", "e6");
2221            let counter2 = Counter::<u64>::default();
2222            ctx_epoch6.register("votes", "vote count", counter2.clone());
2223            counter2.inc();
2224            counter2.inc();
2225
2226            // Both should appear in encoded output with canonical format (single HELP/TYPE)
2227            let buffer = context.encode();
2228            assert!(
2229                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2230                "Expected metric with epoch=e5, got: {}",
2231                buffer
2232            );
2233            assert!(
2234                buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2235                "Expected metric with epoch=e6, got: {}",
2236                buffer
2237            );
2238
2239            // Verify canonical format: HELP and TYPE should appear exactly once
2240            assert_eq!(
2241                buffer.matches("# HELP consensus_votes").count(),
2242                1,
2243                "HELP should appear exactly once, got: {}",
2244                buffer
2245            );
2246            assert_eq!(
2247                buffer.matches("# TYPE consensus_votes").count(),
2248                1,
2249                "TYPE should appear exactly once, got: {}",
2250                buffer
2251            );
2252
2253            // Multiple attributes
2254            let ctx_multi = context
2255                .with_label("engine")
2256                .with_attribute("region", "us")
2257                .with_attribute("instance", "i1");
2258            let counter3 = Counter::<u64>::default();
2259            ctx_multi.register("requests", "request count", counter3.clone());
2260            counter3.inc();
2261
2262            let buffer = context.encode();
2263            assert!(
2264                buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2265                "Expected metric with sorted attributes, got: {}",
2266                buffer
2267            );
2268        });
2269    }
2270
2271    #[test]
2272    fn test_deterministic_metrics_with_attribute() {
2273        let executor = deterministic::Runner::default();
2274        test_metrics_with_attribute(executor);
2275    }
2276
2277    #[test]
2278    fn test_tokio_metrics_with_attribute() {
2279        let runner = tokio::Runner::default();
2280        test_metrics_with_attribute(runner);
2281    }
2282
2283    fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2284    where
2285        R::Context: Metrics,
2286    {
2287        runner.start(|context| async move {
2288            // Create context with attribute, then nest a label
2289            let ctx = context
2290                .with_label("orchestrator")
2291                .with_attribute("epoch", "e5")
2292                .with_label("engine");
2293
2294            // Register a metric
2295            let counter = Counter::<u64>::default();
2296            ctx.register("votes", "vote count", counter.clone());
2297            counter.inc();
2298
2299            // Verify the attribute is preserved through the nested label
2300            let buffer = context.encode();
2301            assert!(
2302                buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2303                "Expected metric with preserved epoch attribute, got: {}",
2304                buffer
2305            );
2306
2307            // Multiple levels of nesting with attributes at different levels
2308            let ctx2 = context
2309                .with_label("outer")
2310                .with_attribute("region", "us")
2311                .with_label("middle")
2312                .with_attribute("az", "east")
2313                .with_label("inner");
2314
2315            let counter2 = Counter::<u64>::default();
2316            ctx2.register("requests", "request count", counter2.clone());
2317            counter2.inc();
2318            counter2.inc();
2319
2320            let buffer = context.encode();
2321            assert!(
2322                buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2323                "Expected metric with all attributes preserved and sorted, got: {}",
2324                buffer
2325            );
2326        });
2327    }
2328
2329    #[test]
2330    fn test_deterministic_metrics_attribute_with_nested_label() {
2331        let executor = deterministic::Runner::default();
2332        test_metrics_attribute_with_nested_label(executor);
2333    }
2334
2335    #[test]
2336    fn test_tokio_metrics_attribute_with_nested_label() {
2337        let runner = tokio::Runner::default();
2338        test_metrics_attribute_with_nested_label(runner);
2339    }
2340
2341    fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2342    where
2343        R::Context: Metrics,
2344    {
2345        runner.start(|context| async move {
2346            // Create two separate sub-contexts, each with their own attribute
2347            let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2348            let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2349
2350            // Register metrics in ctx_a
2351            let c1 = Counter::<u64>::default();
2352            ctx_a.register("requests", "help", c1);
2353
2354            // Register metrics in ctx_b
2355            let c2 = Counter::<u64>::default();
2356            ctx_b.register("requests", "help", c2);
2357
2358            // Register another metric in ctx_a AFTER ctx_b was used
2359            let c3 = Counter::<u64>::default();
2360            ctx_a.register("errors", "help", c3);
2361
2362            let output = context.encode();
2363
2364            // ctx_a metrics should only have epoch=1
2365            assert!(
2366                output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2367                "ctx_a requests should have epoch=1: {output}"
2368            );
2369            assert!(
2370                output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2371                "ctx_a errors should have epoch=1: {output}"
2372            );
2373            assert!(
2374                !output.contains("component_a_requests_total{epoch=\"2\"}"),
2375                "ctx_a requests should not have epoch=2: {output}"
2376            );
2377
2378            // ctx_b metrics should only have epoch=2
2379            assert!(
2380                output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2381                "ctx_b should have epoch=2: {output}"
2382            );
2383            assert!(
2384                !output.contains("component_b_requests_total{epoch=\"1\"}"),
2385                "ctx_b should not have epoch=1: {output}"
2386            );
2387        });
2388    }
2389
2390    #[test]
2391    fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2392        let executor = deterministic::Runner::default();
2393        test_metrics_attributes_isolated_between_contexts(executor);
2394    }
2395
2396    #[test]
2397    fn test_tokio_metrics_attributes_isolated_between_contexts() {
2398        let runner = tokio::Runner::default();
2399        test_metrics_attributes_isolated_between_contexts(runner);
2400    }
2401
2402    fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2403    where
2404        R::Context: Metrics,
2405    {
2406        runner.start(|context| async move {
2407            // Create two contexts with same attributes but different order
2408            let ctx_ab = context
2409                .with_label("service")
2410                .with_attribute("region", "us")
2411                .with_attribute("env", "prod");
2412
2413            let ctx_ba = context
2414                .with_label("service")
2415                .with_attribute("env", "prod")
2416                .with_attribute("region", "us");
2417
2418            // Register via first context
2419            let c1 = Counter::<u64>::default();
2420            ctx_ab.register("requests", "help", c1.clone());
2421            c1.inc();
2422
2423            // Register via second context - same attributes, different metric
2424            let c2 = Counter::<u64>::default();
2425            ctx_ba.register("errors", "help", c2.clone());
2426            c2.inc();
2427            c2.inc();
2428
2429            let output = context.encode();
2430
2431            // Both should have the same label order (alphabetically sorted: env, region)
2432            assert!(
2433                output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2434                "requests should have sorted labels: {output}"
2435            );
2436            assert!(
2437                output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2438                "errors should have sorted labels: {output}"
2439            );
2440
2441            // Should NOT have reverse order
2442            assert!(
2443                !output.contains("region=\"us\",env=\"prod\""),
2444                "should not have unsorted label order: {output}"
2445            );
2446        });
2447    }
2448
2449    #[test]
2450    fn test_deterministic_metrics_attributes_sorted_deterministically() {
2451        let executor = deterministic::Runner::default();
2452        test_metrics_attributes_sorted_deterministically(executor);
2453    }
2454
2455    #[test]
2456    fn test_tokio_metrics_attributes_sorted_deterministically() {
2457        let runner = tokio::Runner::default();
2458        test_metrics_attributes_sorted_deterministically(runner);
2459    }
2460
2461    fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2462    where
2463        R::Context: Metrics,
2464    {
2465        runner.start(|context| async move {
2466            // Service A: plain, no nested labels
2467            let svc_a = context.with_label("service_a");
2468
2469            // Service A with attribute (same top-level label, different context)
2470            let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2471
2472            // Service B with nested label: service_b_worker
2473            let svc_b_worker = context.with_label("service_b").with_label("worker");
2474
2475            // Service B with nested label AND attribute
2476            let svc_b_worker_shard = context
2477                .with_label("service_b")
2478                .with_label("worker")
2479                .with_attribute("shard", 99);
2480
2481            // Service B different nested label: service_b_manager
2482            let svc_b_manager = context.with_label("service_b").with_label("manager");
2483
2484            // Service C: plain, proves no cross-service contamination
2485            let svc_c = context.with_label("service_c");
2486
2487            // Register metrics in all contexts
2488            let c1 = Counter::<u64>::default();
2489            svc_a.register("requests", "help", c1);
2490
2491            let c2 = Counter::<u64>::default();
2492            svc_a_v2.register("requests", "help", c2);
2493
2494            let c3 = Counter::<u64>::default();
2495            svc_b_worker.register("tasks", "help", c3);
2496
2497            let c4 = Counter::<u64>::default();
2498            svc_b_worker_shard.register("tasks", "help", c4);
2499
2500            let c5 = Counter::<u64>::default();
2501            svc_b_manager.register("decisions", "help", c5);
2502
2503            let c6 = Counter::<u64>::default();
2504            svc_c.register("requests", "help", c6);
2505
2506            let output = context.encode();
2507
2508            // Service A plain and attributed both exist independently
2509            assert!(
2510                output.contains("service_a_requests_total 0"),
2511                "svc_a plain should exist: {output}"
2512            );
2513            assert!(
2514                output.contains("service_a_requests_total{version=\"2\"} 0"),
2515                "svc_a_v2 should have version=2: {output}"
2516            );
2517
2518            // Service B worker: plain and attributed versions
2519            assert!(
2520                output.contains("service_b_worker_tasks_total 0"),
2521                "svc_b_worker plain should exist: {output}"
2522            );
2523            assert!(
2524                output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2525                "svc_b_worker_shard should have shard=99: {output}"
2526            );
2527
2528            // Service B manager: no attributes
2529            assert!(
2530                output.contains("service_b_manager_decisions_total 0"),
2531                "svc_b_manager should have no attributes: {output}"
2532            );
2533            assert!(
2534                !output.contains("service_b_manager_decisions_total{"),
2535                "svc_b_manager should have no attributes at all: {output}"
2536            );
2537
2538            // Service C: no attributes, no contamination
2539            assert!(
2540                output.contains("service_c_requests_total 0"),
2541                "svc_c should have no attributes: {output}"
2542            );
2543            assert!(
2544                !output.contains("service_c_requests_total{"),
2545                "svc_c should have no attributes at all: {output}"
2546            );
2547
2548            // Cross-contamination checks
2549            assert!(
2550                !output.contains("service_b_manager_decisions_total{shard="),
2551                "svc_b_manager should not have shard: {output}"
2552            );
2553            assert!(
2554                !output.contains("service_a_requests_total{shard="),
2555                "svc_a should not have shard: {output}"
2556            );
2557            assert!(
2558                !output.contains("service_c_requests_total{version="),
2559                "svc_c should not have version: {output}"
2560            );
2561        });
2562    }
2563
2564    #[test]
2565    fn test_deterministic_metrics_nested_labels_with_attributes() {
2566        let executor = deterministic::Runner::default();
2567        test_metrics_nested_labels_with_attributes(executor);
2568    }
2569
2570    #[test]
2571    fn test_tokio_metrics_nested_labels_with_attributes() {
2572        let runner = tokio::Runner::default();
2573        test_metrics_nested_labels_with_attributes(runner);
2574    }
2575
2576    fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2577    where
2578        R::Context: Metrics,
2579    {
2580        runner.start(|context| async move {
2581            #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2582            struct RequestLabels {
2583                method: String,
2584                status: u16,
2585            }
2586
2587            // Create context with attribute
2588            let ctx = context
2589                .with_label("api")
2590                .with_attribute("region", "us_east")
2591                .with_attribute("env", "prod");
2592
2593            // Register a Family metric
2594            let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2595            ctx.register("requests", "HTTP requests", requests.clone());
2596
2597            // Increment counters for different label combinations
2598            requests
2599                .get_or_create(&RequestLabels {
2600                    method: "GET".to_string(),
2601                    status: 200,
2602                })
2603                .inc();
2604            requests
2605                .get_or_create(&RequestLabels {
2606                    method: "POST".to_string(),
2607                    status: 201,
2608                })
2609                .inc();
2610            requests
2611                .get_or_create(&RequestLabels {
2612                    method: "GET".to_string(),
2613                    status: 404,
2614                })
2615                .inc();
2616
2617            let output = context.encode();
2618
2619            // Context attributes appear first (alphabetically sorted), then Family labels
2620            // Context attributes: env="prod", region="us_east"
2621            // Family labels: method, status
2622            assert!(
2623                output.contains(
2624                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2625                ),
2626                "GET 200 should have merged labels: {output}"
2627            );
2628            assert!(
2629                output.contains(
2630                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2631                ),
2632                "POST 201 should have merged labels: {output}"
2633            );
2634            assert!(
2635                output.contains(
2636                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2637                ),
2638                "GET 404 should have merged labels: {output}"
2639            );
2640
2641            // Create another context WITHOUT attributes to verify isolation
2642            let ctx_plain = context.with_label("api_plain");
2643            let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2644            ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2645
2646            plain_requests
2647                .get_or_create(&RequestLabels {
2648                    method: "DELETE".to_string(),
2649                    status: 204,
2650                })
2651                .inc();
2652
2653            let output = context.encode();
2654
2655            // Plain context should have Family labels but no context attributes
2656            assert!(
2657                output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2658                "plain DELETE should have only family labels: {output}"
2659            );
2660            assert!(
2661                !output.contains("api_plain_requests_total{env="),
2662                "plain should not have env attribute: {output}"
2663            );
2664            assert!(
2665                !output.contains("api_plain_requests_total{region="),
2666                "plain should not have region attribute: {output}"
2667            );
2668        });
2669    }
2670
2671    #[test]
2672    fn test_deterministic_metrics_family_with_attributes() {
2673        let executor = deterministic::Runner::default();
2674        test_metrics_family_with_attributes(executor);
2675    }
2676
2677    #[test]
2678    fn test_tokio_metrics_family_with_attributes() {
2679        let runner = tokio::Runner::default();
2680        test_metrics_family_with_attributes(runner);
2681    }
2682
2683    #[test]
2684    fn test_deterministic_future() {
2685        let runner = deterministic::Runner::default();
2686        test_error_future(runner);
2687    }
2688
2689    #[test]
2690    fn test_deterministic_clock_sleep() {
2691        let executor = deterministic::Runner::default();
2692        test_clock_sleep(executor);
2693    }
2694
2695    #[test]
2696    fn test_deterministic_clock_sleep_until() {
2697        let executor = deterministic::Runner::default();
2698        test_clock_sleep_until(executor);
2699    }
2700
2701    #[test]
2702    fn test_deterministic_clock_timeout() {
2703        let executor = deterministic::Runner::default();
2704        test_clock_timeout(executor);
2705    }
2706
2707    #[test]
2708    fn test_deterministic_root_finishes() {
2709        let executor = deterministic::Runner::default();
2710        test_root_finishes(executor);
2711    }
2712
2713    #[test]
2714    fn test_deterministic_spawn_after_abort() {
2715        let executor = deterministic::Runner::default();
2716        test_spawn_after_abort(executor);
2717    }
2718
2719    #[test]
2720    fn test_deterministic_spawn_abort() {
2721        let executor = deterministic::Runner::default();
2722        test_spawn_abort(executor, false, false);
2723    }
2724
2725    #[test]
2726    #[should_panic(expected = "blah")]
2727    fn test_deterministic_panic_aborts_root() {
2728        let runner = deterministic::Runner::default();
2729        test_panic_aborts_root(runner);
2730    }
2731
2732    #[test]
2733    #[should_panic(expected = "blah")]
2734    fn test_deterministic_panic_aborts_root_caught() {
2735        let cfg = deterministic::Config::default().with_catch_panics(true);
2736        let runner = deterministic::Runner::new(cfg);
2737        test_panic_aborts_root(runner);
2738    }
2739
2740    #[test]
2741    #[should_panic(expected = "blah")]
2742    fn test_deterministic_panic_aborts_spawn() {
2743        let executor = deterministic::Runner::default();
2744        test_panic_aborts_spawn(executor);
2745    }
2746
2747    #[test]
2748    fn test_deterministic_panic_aborts_spawn_caught() {
2749        let cfg = deterministic::Config::default().with_catch_panics(true);
2750        let executor = deterministic::Runner::new(cfg);
2751        test_panic_aborts_spawn_caught(executor);
2752    }
2753
2754    #[test]
2755    #[should_panic(expected = "boom")]
2756    fn test_deterministic_multiple_panics() {
2757        let executor = deterministic::Runner::default();
2758        test_multiple_panics(executor);
2759    }
2760
2761    #[test]
2762    fn test_deterministic_multiple_panics_caught() {
2763        let cfg = deterministic::Config::default().with_catch_panics(true);
2764        let executor = deterministic::Runner::new(cfg);
2765        test_multiple_panics_caught(executor);
2766    }
2767
2768    #[test]
2769    fn test_deterministic_select() {
2770        let executor = deterministic::Runner::default();
2771        test_select(executor);
2772    }
2773
2774    #[test]
2775    fn test_deterministic_select_loop() {
2776        let executor = deterministic::Runner::default();
2777        test_select_loop(executor);
2778    }
2779
2780    #[test]
2781    fn test_deterministic_storage_operations() {
2782        let executor = deterministic::Runner::default();
2783        test_storage_operations(executor);
2784    }
2785
2786    #[test]
2787    fn test_deterministic_blob_read_write() {
2788        let executor = deterministic::Runner::default();
2789        test_blob_read_write(executor);
2790    }
2791
2792    #[test]
2793    fn test_deterministic_blob_resize() {
2794        let executor = deterministic::Runner::default();
2795        test_blob_resize(executor);
2796    }
2797
2798    #[test]
2799    fn test_deterministic_many_partition_read_write() {
2800        let executor = deterministic::Runner::default();
2801        test_many_partition_read_write(executor);
2802    }
2803
2804    #[test]
2805    fn test_deterministic_blob_read_past_length() {
2806        let executor = deterministic::Runner::default();
2807        test_blob_read_past_length(executor);
2808    }
2809
2810    #[test]
2811    fn test_deterministic_blob_clone_and_concurrent_read() {
2812        // Run test
2813        let executor = deterministic::Runner::default();
2814        test_blob_clone_and_concurrent_read(executor);
2815    }
2816
2817    #[test]
2818    fn test_deterministic_shutdown() {
2819        let executor = deterministic::Runner::default();
2820        test_shutdown(executor);
2821    }
2822
2823    #[test]
2824    fn test_deterministic_shutdown_multiple_signals() {
2825        let executor = deterministic::Runner::default();
2826        test_shutdown_multiple_signals(executor);
2827    }
2828
2829    #[test]
2830    fn test_deterministic_shutdown_timeout() {
2831        let executor = deterministic::Runner::default();
2832        test_shutdown_timeout(executor);
2833    }
2834
2835    #[test]
2836    fn test_deterministic_shutdown_multiple_stop_calls() {
2837        let executor = deterministic::Runner::default();
2838        test_shutdown_multiple_stop_calls(executor);
2839    }
2840
2841    #[test]
2842    fn test_deterministic_unfulfilled_shutdown() {
2843        let executor = deterministic::Runner::default();
2844        test_unfulfilled_shutdown(executor);
2845    }
2846
2847    #[test]
2848    fn test_deterministic_spawn_dedicated() {
2849        let executor = deterministic::Runner::default();
2850        test_spawn_dedicated(executor);
2851    }
2852
2853    #[test]
2854    fn test_deterministic_spawn() {
2855        let runner = deterministic::Runner::default();
2856        test_spawn(runner);
2857    }
2858
2859    #[test]
2860    fn test_deterministic_spawn_abort_on_parent_abort() {
2861        let runner = deterministic::Runner::default();
2862        test_spawn_abort_on_parent_abort(runner);
2863    }
2864
2865    #[test]
2866    fn test_deterministic_spawn_abort_on_parent_completion() {
2867        let runner = deterministic::Runner::default();
2868        test_spawn_abort_on_parent_completion(runner);
2869    }
2870
2871    #[test]
2872    fn test_deterministic_spawn_cascading_abort() {
2873        let runner = deterministic::Runner::default();
2874        test_spawn_cascading_abort(runner);
2875    }
2876
2877    #[test]
2878    fn test_deterministic_child_survives_sibling_completion() {
2879        let runner = deterministic::Runner::default();
2880        test_child_survives_sibling_completion(runner);
2881    }
2882
2883    #[test]
2884    fn test_deterministic_spawn_clone_chain() {
2885        let runner = deterministic::Runner::default();
2886        test_spawn_clone_chain(runner);
2887    }
2888
2889    #[test]
2890    fn test_deterministic_spawn_sparse_clone_chain() {
2891        let runner = deterministic::Runner::default();
2892        test_spawn_sparse_clone_chain(runner);
2893    }
2894
2895    #[test]
2896    fn test_deterministic_spawn_blocking() {
2897        for dedicated in [false, true] {
2898            let executor = deterministic::Runner::default();
2899            test_spawn_blocking(executor, dedicated);
2900        }
2901    }
2902
2903    #[test]
2904    #[should_panic(expected = "blocking task panicked")]
2905    fn test_deterministic_spawn_blocking_panic() {
2906        for dedicated in [false, true] {
2907            let executor = deterministic::Runner::default();
2908            test_spawn_blocking_panic(executor, dedicated);
2909        }
2910    }
2911
2912    #[test]
2913    fn test_deterministic_spawn_blocking_panic_caught() {
2914        for dedicated in [false, true] {
2915            let cfg = deterministic::Config::default().with_catch_panics(true);
2916            let executor = deterministic::Runner::new(cfg);
2917            test_spawn_blocking_panic_caught(executor, dedicated);
2918        }
2919    }
2920
2921    #[test]
2922    fn test_deterministic_spawn_blocking_abort() {
2923        for (dedicated, blocking) in [(false, true), (true, false)] {
2924            let executor = deterministic::Runner::default();
2925            test_spawn_abort(executor, dedicated, blocking);
2926        }
2927    }
2928
2929    #[test]
2930    fn test_deterministic_circular_reference_prevents_cleanup() {
2931        let executor = deterministic::Runner::default();
2932        test_circular_reference_prevents_cleanup(executor);
2933    }
2934
2935    #[test]
2936    fn test_deterministic_late_waker() {
2937        let executor = deterministic::Runner::default();
2938        test_late_waker(executor);
2939    }
2940
2941    #[test]
2942    fn test_deterministic_metrics() {
2943        let executor = deterministic::Runner::default();
2944        test_metrics(executor);
2945    }
2946
2947    #[test_collect_traces]
2948    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2949        let executor = deterministic::Runner::new(deterministic::Config::default());
2950        executor.start(|context| async move {
2951            context
2952                .with_label("test")
2953                .instrumented()
2954                .spawn(|context| async move {
2955                    tracing::info!(field = "test field", "test log");
2956
2957                    context
2958                        .with_label("inner")
2959                        .instrumented()
2960                        .spawn(|_| async move {
2961                            tracing::info!("inner log");
2962                        })
2963                        .await
2964                        .unwrap();
2965                })
2966                .await
2967                .unwrap();
2968        });
2969
2970        let info_traces = traces.get_by_level(Level::INFO);
2971        assert_eq!(info_traces.len(), 2);
2972
2973        // Outer log (single span)
2974        info_traces
2975            .expect_event_at_index(0, |event| {
2976                event.metadata.expect_content_exact("test log")?;
2977                event.metadata.expect_field_count(1)?;
2978                event.metadata.expect_field_exact("field", "test field")?;
2979                event.expect_span_count(1)?;
2980                event.expect_span_at_index(0, |span| {
2981                    span.expect_content_exact("task")?;
2982                    span.expect_field_count(1)?;
2983                    span.expect_field_exact("name", "test")
2984                })
2985            })
2986            .unwrap();
2987
2988        info_traces
2989            .expect_event_at_index(1, |event| {
2990                event.metadata.expect_content_exact("inner log")?;
2991                event.metadata.expect_field_count(0)?;
2992                event.expect_span_count(1)?;
2993                event.expect_span_at_index(0, |span| {
2994                    span.expect_content_exact("task")?;
2995                    span.expect_field_count(1)?;
2996                    span.expect_field_exact("name", "test_inner")
2997                })
2998            })
2999            .unwrap();
3000    }
3001
3002    #[test]
3003    fn test_deterministic_resolver() {
3004        let executor = deterministic::Runner::default();
3005        executor.start(|context| async move {
3006            // Register DNS mappings
3007            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3008            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3009            context.resolver_register("example.com", Some(vec![ip1, ip2]));
3010
3011            // Resolve registered hostname
3012            let addrs = context.resolve("example.com").await.unwrap();
3013            assert_eq!(addrs, vec![ip1, ip2]);
3014
3015            // Resolve unregistered hostname
3016            let result = context.resolve("unknown.com").await;
3017            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3018
3019            // Remove mapping
3020            context.resolver_register("example.com", None);
3021            let result = context.resolve("example.com").await;
3022            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3023        });
3024    }
3025
3026    #[test]
3027    fn test_tokio_error_future() {
3028        let runner = tokio::Runner::default();
3029        test_error_future(runner);
3030    }
3031
3032    #[test]
3033    fn test_tokio_clock_sleep() {
3034        let executor = tokio::Runner::default();
3035        test_clock_sleep(executor);
3036    }
3037
3038    #[test]
3039    fn test_tokio_clock_sleep_until() {
3040        let executor = tokio::Runner::default();
3041        test_clock_sleep_until(executor);
3042    }
3043
3044    #[test]
3045    fn test_tokio_clock_timeout() {
3046        let executor = tokio::Runner::default();
3047        test_clock_timeout(executor);
3048    }
3049
3050    #[test]
3051    fn test_tokio_root_finishes() {
3052        let executor = tokio::Runner::default();
3053        test_root_finishes(executor);
3054    }
3055
3056    #[test]
3057    fn test_tokio_spawn_after_abort() {
3058        let executor = tokio::Runner::default();
3059        test_spawn_after_abort(executor);
3060    }
3061
3062    #[test]
3063    fn test_tokio_spawn_abort() {
3064        let executor = tokio::Runner::default();
3065        test_spawn_abort(executor, false, false);
3066    }
3067
3068    #[test]
3069    #[should_panic(expected = "blah")]
3070    fn test_tokio_panic_aborts_root() {
3071        let executor = tokio::Runner::default();
3072        test_panic_aborts_root(executor);
3073    }
3074
3075    #[test]
3076    #[should_panic(expected = "blah")]
3077    fn test_tokio_panic_aborts_root_caught() {
3078        let cfg = tokio::Config::default().with_catch_panics(true);
3079        let executor = tokio::Runner::new(cfg);
3080        test_panic_aborts_root(executor);
3081    }
3082
3083    #[test]
3084    #[should_panic(expected = "blah")]
3085    fn test_tokio_panic_aborts_spawn() {
3086        let executor = tokio::Runner::default();
3087        test_panic_aborts_spawn(executor);
3088    }
3089
3090    #[test]
3091    fn test_tokio_panic_aborts_spawn_caught() {
3092        let cfg = tokio::Config::default().with_catch_panics(true);
3093        let executor = tokio::Runner::new(cfg);
3094        test_panic_aborts_spawn_caught(executor);
3095    }
3096
3097    #[test]
3098    #[should_panic(expected = "boom")]
3099    fn test_tokio_multiple_panics() {
3100        let executor = tokio::Runner::default();
3101        test_multiple_panics(executor);
3102    }
3103
3104    #[test]
3105    fn test_tokio_multiple_panics_caught() {
3106        let cfg = tokio::Config::default().with_catch_panics(true);
3107        let executor = tokio::Runner::new(cfg);
3108        test_multiple_panics_caught(executor);
3109    }
3110
3111    #[test]
3112    fn test_tokio_select() {
3113        let executor = tokio::Runner::default();
3114        test_select(executor);
3115    }
3116
3117    #[test]
3118    fn test_tokio_select_loop() {
3119        let executor = tokio::Runner::default();
3120        test_select_loop(executor);
3121    }
3122
3123    #[test]
3124    fn test_tokio_storage_operations() {
3125        let executor = tokio::Runner::default();
3126        test_storage_operations(executor);
3127    }
3128
3129    #[test]
3130    fn test_tokio_blob_read_write() {
3131        let executor = tokio::Runner::default();
3132        test_blob_read_write(executor);
3133    }
3134
3135    #[test]
3136    fn test_tokio_blob_resize() {
3137        let executor = tokio::Runner::default();
3138        test_blob_resize(executor);
3139    }
3140
3141    #[test]
3142    fn test_tokio_many_partition_read_write() {
3143        let executor = tokio::Runner::default();
3144        test_many_partition_read_write(executor);
3145    }
3146
3147    #[test]
3148    fn test_tokio_blob_read_past_length() {
3149        let executor = tokio::Runner::default();
3150        test_blob_read_past_length(executor);
3151    }
3152
3153    #[test]
3154    fn test_tokio_blob_clone_and_concurrent_read() {
3155        // Run test
3156        let executor = tokio::Runner::default();
3157        test_blob_clone_and_concurrent_read(executor);
3158    }
3159
3160    #[test]
3161    fn test_tokio_shutdown() {
3162        let executor = tokio::Runner::default();
3163        test_shutdown(executor);
3164    }
3165
3166    #[test]
3167    fn test_tokio_shutdown_multiple_signals() {
3168        let executor = tokio::Runner::default();
3169        test_shutdown_multiple_signals(executor);
3170    }
3171
3172    #[test]
3173    fn test_tokio_shutdown_timeout() {
3174        let executor = tokio::Runner::default();
3175        test_shutdown_timeout(executor);
3176    }
3177
3178    #[test]
3179    fn test_tokio_shutdown_multiple_stop_calls() {
3180        let executor = tokio::Runner::default();
3181        test_shutdown_multiple_stop_calls(executor);
3182    }
3183
3184    #[test]
3185    fn test_tokio_unfulfilled_shutdown() {
3186        let executor = tokio::Runner::default();
3187        test_unfulfilled_shutdown(executor);
3188    }
3189
3190    #[test]
3191    fn test_tokio_spawn_dedicated() {
3192        let executor = tokio::Runner::default();
3193        test_spawn_dedicated(executor);
3194    }
3195
3196    #[test]
3197    fn test_tokio_spawn() {
3198        let runner = tokio::Runner::default();
3199        test_spawn(runner);
3200    }
3201
3202    #[test]
3203    fn test_tokio_spawn_abort_on_parent_abort() {
3204        let runner = tokio::Runner::default();
3205        test_spawn_abort_on_parent_abort(runner);
3206    }
3207
3208    #[test]
3209    fn test_tokio_spawn_abort_on_parent_completion() {
3210        let runner = tokio::Runner::default();
3211        test_spawn_abort_on_parent_completion(runner);
3212    }
3213
3214    #[test]
3215    fn test_tokio_spawn_cascading_abort() {
3216        let runner = tokio::Runner::default();
3217        test_spawn_cascading_abort(runner);
3218    }
3219
3220    #[test]
3221    fn test_tokio_child_survives_sibling_completion() {
3222        let runner = tokio::Runner::default();
3223        test_child_survives_sibling_completion(runner);
3224    }
3225
3226    #[test]
3227    fn test_tokio_spawn_clone_chain() {
3228        let runner = tokio::Runner::default();
3229        test_spawn_clone_chain(runner);
3230    }
3231
3232    #[test]
3233    fn test_tokio_spawn_sparse_clone_chain() {
3234        let runner = tokio::Runner::default();
3235        test_spawn_sparse_clone_chain(runner);
3236    }
3237
3238    #[test]
3239    fn test_tokio_spawn_blocking() {
3240        for dedicated in [false, true] {
3241            let executor = tokio::Runner::default();
3242            test_spawn_blocking(executor, dedicated);
3243        }
3244    }
3245
3246    #[test]
3247    #[should_panic(expected = "blocking task panicked")]
3248    fn test_tokio_spawn_blocking_panic() {
3249        for dedicated in [false, true] {
3250            let executor = tokio::Runner::default();
3251            test_spawn_blocking_panic(executor, dedicated);
3252        }
3253    }
3254
3255    #[test]
3256    fn test_tokio_spawn_blocking_panic_caught() {
3257        for dedicated in [false, true] {
3258            let cfg = tokio::Config::default().with_catch_panics(true);
3259            let executor = tokio::Runner::new(cfg);
3260            test_spawn_blocking_panic_caught(executor, dedicated);
3261        }
3262    }
3263
3264    #[test]
3265    fn test_tokio_spawn_blocking_abort() {
3266        for (dedicated, blocking) in [(false, true), (true, false)] {
3267            let executor = tokio::Runner::default();
3268            test_spawn_abort(executor, dedicated, blocking);
3269        }
3270    }
3271
3272    #[test]
3273    fn test_tokio_circular_reference_prevents_cleanup() {
3274        let executor = tokio::Runner::default();
3275        test_circular_reference_prevents_cleanup(executor);
3276    }
3277
3278    #[test]
3279    fn test_tokio_late_waker() {
3280        let executor = tokio::Runner::default();
3281        test_late_waker(executor);
3282    }
3283
3284    #[test]
3285    fn test_tokio_metrics() {
3286        let executor = tokio::Runner::default();
3287        test_metrics(executor);
3288    }
3289
3290    #[test]
3291    fn test_tokio_process_rss_metric() {
3292        let executor = tokio::Runner::default();
3293        executor.start(|context| async move {
3294            loop {
3295                // Wait for RSS metric to be available
3296                let metrics = context.encode();
3297                if !metrics.contains("runtime_process_rss") {
3298                    context.sleep(Duration::from_millis(100)).await;
3299                    continue;
3300                }
3301
3302                // Verify the RSS value is eventually populated (greater than 0)
3303                for line in metrics.lines() {
3304                    if line.starts_with("runtime_process_rss")
3305                        && !line.starts_with("runtime_process_rss{")
3306                    {
3307                        let parts: Vec<&str> = line.split_whitespace().collect();
3308                        if parts.len() >= 2 {
3309                            let rss_value: i64 =
3310                                parts[1].parse().expect("Failed to parse RSS value");
3311                            if rss_value > 0 {
3312                                return;
3313                            }
3314                        }
3315                    }
3316                }
3317            }
3318        });
3319    }
3320
3321    #[test]
3322    fn test_tokio_telemetry() {
3323        let executor = tokio::Runner::default();
3324        executor.start(|context| async move {
3325            // Define the server address
3326            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3327
3328            // Configure telemetry
3329            tokio::telemetry::init(
3330                context.with_label("metrics"),
3331                tokio::telemetry::Logging {
3332                    level: Level::INFO,
3333                    json: false,
3334                },
3335                Some(address),
3336                None,
3337            );
3338
3339            // Register a test metric
3340            let counter: Counter<u64> = Counter::default();
3341            context.register("test_counter", "Test counter", counter.clone());
3342            counter.inc();
3343
3344            // Helper functions to parse HTTP response
3345            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3346                let mut line = Vec::new();
3347                loop {
3348                    let received = stream.recv(1).await?;
3349                    let byte = received.coalesce().as_ref()[0];
3350                    if byte == b'\n' {
3351                        if line.last() == Some(&b'\r') {
3352                            line.pop(); // Remove trailing \r
3353                        }
3354                        break;
3355                    }
3356                    line.push(byte);
3357                }
3358                String::from_utf8(line).map_err(|_| Error::ReadFailed)
3359            }
3360
3361            async fn read_headers<St: Stream>(
3362                stream: &mut St,
3363            ) -> Result<HashMap<String, String>, Error> {
3364                let mut headers = HashMap::new();
3365                loop {
3366                    let line = read_line(stream).await?;
3367                    if line.is_empty() {
3368                        break;
3369                    }
3370                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
3371                    if parts.len() == 2 {
3372                        headers.insert(parts[0].to_string(), parts[1].to_string());
3373                    }
3374                }
3375                Ok(headers)
3376            }
3377
3378            async fn read_body<St: Stream>(
3379                stream: &mut St,
3380                content_length: usize,
3381            ) -> Result<String, Error> {
3382                let received = stream.recv(content_length as u64).await?;
3383                String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3384            }
3385
3386            // Simulate a client connecting to the server
3387            let client_handle = context
3388                .with_label("client")
3389                .spawn(move |context| async move {
3390                    let (mut sink, mut stream) = loop {
3391                        match context.dial(address).await {
3392                            Ok((sink, stream)) => break (sink, stream),
3393                            Err(e) => {
3394                                // The client may be polled before the server is ready, that's alright!
3395                                error!(err =?e, "failed to connect");
3396                                context.sleep(Duration::from_millis(10)).await;
3397                            }
3398                        }
3399                    };
3400
3401                    // Send a GET request to the server
3402                    let request = format!(
3403                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3404                    );
3405                    sink.send(Bytes::from(request)).await.unwrap();
3406
3407                    // Read and verify the HTTP status line
3408                    let status_line = read_line(&mut stream).await.unwrap();
3409                    assert_eq!(status_line, "HTTP/1.1 200 OK");
3410
3411                    // Read and parse headers
3412                    let headers = read_headers(&mut stream).await.unwrap();
3413                    println!("Headers: {headers:?}");
3414                    let content_length = headers
3415                        .get("content-length")
3416                        .unwrap()
3417                        .parse::<usize>()
3418                        .unwrap();
3419
3420                    // Read and verify the body
3421                    let body = read_body(&mut stream, content_length).await.unwrap();
3422                    assert!(body.contains("test_counter_total 1"));
3423                });
3424
3425            // Wait for the client task to complete
3426            client_handle.await.unwrap();
3427        });
3428    }
3429
3430    #[test]
3431    fn test_tokio_resolver() {
3432        let executor = tokio::Runner::default();
3433        executor.start(|context| async move {
3434            let addrs = context.resolve("localhost").await.unwrap();
3435            assert!(!addrs.is_empty());
3436            for addr in addrs {
3437                assert!(
3438                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3439                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3440                );
3441            }
3442        });
3443    }
3444
3445    #[test]
3446    fn test_create_thread_pool_tokio() {
3447        let executor = tokio::Runner::default();
3448        executor.start(|context| async move {
3449            // Create a thread pool with 4 threads
3450            let pool = context
3451                .with_label("pool")
3452                .create_thread_pool(NZUsize!(4))
3453                .unwrap();
3454
3455            // Create a vector of numbers
3456            let v: Vec<_> = (0..10000).collect();
3457
3458            // Use the thread pool to sum the numbers
3459            pool.install(|| {
3460                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3461            });
3462        });
3463    }
3464
3465    #[test]
3466    fn test_create_thread_pool_deterministic() {
3467        let executor = deterministic::Runner::default();
3468        executor.start(|context| async move {
3469            // Create a thread pool with 4 threads
3470            let pool = context
3471                .with_label("pool")
3472                .create_thread_pool(NZUsize!(4))
3473                .unwrap();
3474
3475            // Create a vector of numbers
3476            let v: Vec<_> = (0..10000).collect();
3477
3478            // Use the thread pool to sum the numbers
3479            pool.install(|| {
3480                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3481            });
3482        });
3483    }
3484
3485    fn test_buffer_pooler<R: Runner>(runner: R)
3486    where
3487        R::Context: BufferPooler,
3488    {
3489        runner.start(|context| async move {
3490            // Verify network pool is accessible and works (cache-line aligned)
3491            let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
3492            assert!(net_buf.capacity() >= 1024);
3493
3494            // Verify storage pool is accessible and works (page-aligned)
3495            let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
3496            assert!(storage_buf.capacity() >= 4096);
3497
3498            // Verify pools have expected configurations
3499            assert_eq!(
3500                context.network_buffer_pool().config().max_per_class.get(),
3501                4096
3502            );
3503            assert_eq!(
3504                context.storage_buffer_pool().config().max_per_class.get(),
3505                32
3506            );
3507        });
3508    }
3509
3510    #[test]
3511    fn test_deterministic_buffer_pooler() {
3512        let runner = deterministic::Runner::default();
3513        test_buffer_pooler(runner);
3514    }
3515
3516    #[test]
3517    fn test_tokio_buffer_pooler() {
3518        let runner = tokio::Runner::default();
3519        test_buffer_pooler(runner);
3520    }
3521}