Skip to main content

commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
18
19#![doc(
20    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21    html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34    pub mod deterministic;
35    pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38    pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41    mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44    pub mod tokio;
45});
46stability_scope!(BETA {
47    use commonware_macros::select;
48    use commonware_parallel::{Rayon, ThreadPool};
49    use iobuf::PoolError;
50    use prometheus_client::registry::Metric;
51    use rayon::ThreadPoolBuildError;
52    use std::{
53        future::Future,
54        io::Error as IoError,
55        net::SocketAddr,
56        num::NonZeroUsize,
57        time::{Duration, SystemTime},
58    };
59    use thiserror::Error;
60
61    /// Prefix for runtime metrics.
62    pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64    /// Re-export of `Buf` and `BufMut` traits for usage with [I/O buffers](iobuf).
65    pub use bytes::{Buf, BufMut};
66    /// Re-export of [governor::Quota] for rate limiting configuration.
67    pub use governor::Quota;
68
69    pub mod iobuf;
70    pub use iobuf::{BufferPool, BufferPoolConfig, IoBuf, IoBufMut, IoBufs, IoBufsMut};
71
72    pub mod utils;
73    pub use utils::*;
74
75    pub mod telemetry;
76
77    /// Default [`Blob`] version used when no version is specified via [`Storage::open`].
78    pub const DEFAULT_BLOB_VERSION: u16 = 0;
79
80    /// Errors that can occur when interacting with the runtime.
81    #[derive(Error, Debug)]
82    pub enum Error {
83        #[error("exited")]
84        Exited,
85        #[error("closed")]
86        Closed,
87        #[error("timeout")]
88        Timeout,
89        #[error("bind failed")]
90        BindFailed,
91        #[error("connection failed")]
92        ConnectionFailed,
93        #[error("write failed")]
94        WriteFailed,
95        #[error("read failed")]
96        ReadFailed,
97        #[error("send failed")]
98        SendFailed,
99        #[error("recv failed")]
100        RecvFailed,
101        #[error("dns resolution failed: {0}")]
102        ResolveFailed(String),
103        #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
104        PartitionNameInvalid(String),
105        #[error("partition creation failed: {0}")]
106        PartitionCreationFailed(String),
107        #[error("partition missing: {0}")]
108        PartitionMissing(String),
109        #[error("partition corrupt: {0}")]
110        PartitionCorrupt(String),
111        #[error("blob open failed: {0}/{1} error: {2}")]
112        BlobOpenFailed(String, String, IoError),
113        #[error("blob missing: {0}/{1}")]
114        BlobMissing(String, String),
115        #[error("blob resize failed: {0}/{1} error: {2}")]
116        BlobResizeFailed(String, String, IoError),
117        #[error("blob sync failed: {0}/{1} error: {2}")]
118        BlobSyncFailed(String, String, IoError),
119        #[error("blob insufficient length")]
120        BlobInsufficientLength,
121        #[error("blob corrupt: {0}/{1} reason: {2}")]
122        BlobCorrupt(String, String, String),
123        #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
124        BlobVersionMismatch {
125            expected: std::ops::RangeInclusive<u16>,
126            found: u16,
127        },
128        #[error("invalid or missing checksum")]
129        InvalidChecksum,
130        #[error("offset overflow")]
131        OffsetOverflow,
132        #[error("io error: {0}")]
133        Io(#[from] IoError),
134        #[error("buffer pool: {0}")]
135        Pool(#[from] PoolError),
136    }
137
138    /// Interface that any task scheduler must implement to start
139    /// running tasks.
140    pub trait Runner {
141        /// Context defines the environment available to tasks.
142        /// Example of possible services provided by the context include:
143        /// - [Clock] for time-based operations
144        /// - [Network] for network operations
145        /// - [Storage] for storage operations
146        type Context;
147
148        /// Start running a root task.
149        ///
150        /// When this function returns, all spawned tasks will be canceled. If clean
151        /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
152        /// [Spawner::stopped] to coordinate clean shutdown.
153        fn start<F, Fut>(self, f: F) -> Fut::Output
154        where
155            F: FnOnce(Self::Context) -> Fut,
156            Fut: Future;
157    }
158
159    /// Interface that any task scheduler must implement to spawn tasks.
160    pub trait Spawner: Clone + Send + Sync + 'static {
161        /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
162        ///
163        /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
164        /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
165        /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
166        /// [`Spawner::dedicated`] instead.
167        ///
168        /// The shared executor with `blocking == false` is the default spawn mode.
169        fn shared(self, blocking: bool) -> Self;
170
171        /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
172        ///
173        /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
174        /// shared executor.
175        ///
176        /// This is not the default behavior. See [`Spawner::shared`] for more information.
177        fn dedicated(self) -> Self;
178
179        /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
180        fn instrumented(self) -> Self;
181
182        /// Spawn a task with the current context.
183        ///
184        /// Unlike directly awaiting a future, the task starts running immediately even if the caller
185        /// never awaits the returned [`Handle`].
186        ///
187        /// # Mandatory Supervision
188        ///
189        /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
190        ///
191        /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
192        /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
193        ///
194        /// ```txt
195        /// ctx_a
196        ///   |
197        ///   +-- clone() ---> ctx_c
198        ///   |                  |
199        ///   |                  +-- spawn() ---> Task C (ctx_d)
200        ///   |
201        ///   +-- spawn() ---> Task A (ctx_b)
202        ///                              |
203        ///                              +-- spawn() ---> Task B (ctx_e)
204        ///
205        /// Task A finishes or aborts --> Task B and Task C are aborted
206        /// ```
207        ///
208        /// # Spawn Configuration
209        ///
210        /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
211        /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
212        ///
213        /// Child tasks should assume they start from a clean configuration without needing to inspect how their
214        /// parent was configured.
215        fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
216        where
217            F: FnOnce(Self) -> Fut + Send + 'static,
218            Fut: Future<Output = T> + Send + 'static,
219            T: Send + 'static;
220
221        /// Signals the runtime to stop execution and waits for all outstanding tasks
222        /// to perform any required cleanup and exit.
223        ///
224        /// This method does not actually kill any tasks but rather signals to them, using
225        /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
226        /// It then waits for all [signal::Signal] references to be dropped before returning.
227        ///
228        /// ## Multiple Stop Calls
229        ///
230        /// This method is idempotent and safe to call multiple times concurrently (on
231        /// different instances of the same context since it consumes `self`). The first
232        /// call initiates shutdown with the provided `value`, and all subsequent calls
233        /// will wait for the same completion regardless of their `value` parameter, i.e.
234        /// the original `value` from the first call is preserved.
235        ///
236        /// ## Timeout
237        ///
238        /// If a timeout is provided, the method will return an error if all [signal::Signal]
239        /// references have not been dropped within the specified duration.
240        fn stop(
241            self,
242            value: i32,
243            timeout: Option<Duration>,
244        ) -> impl Future<Output = Result<(), Error>> + Send;
245
246        /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
247        /// any task.
248        ///
249        /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
250        /// immediately. The [signal::Signal] returned will always resolve to the value of the
251        /// first [Spawner::stop] call.
252        fn stopped(&self) -> signal::Signal;
253    }
254
255    /// Trait for creating [rayon]-compatible thread pools with each worker thread
256    /// placed on dedicated threads via [Spawner].
257    pub trait ThreadPooler: Spawner + Metrics {
258        /// Creates a clone-able [rayon]-compatible thread pool with [Spawner::spawn].
259        ///
260        /// # Arguments
261        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
262        ///
263        /// # Returns
264        /// A `Result` containing the configured [rayon::ThreadPool] or a [rayon::ThreadPoolBuildError] if the pool cannot
265        /// be built.
266        fn create_thread_pool(
267            &self,
268            concurrency: NonZeroUsize,
269        ) -> Result<ThreadPool, ThreadPoolBuildError>;
270
271        /// Creates a clone-able [Rayon] strategy for use with [commonware_parallel].
272        ///
273        /// # Arguments
274        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
275        ///
276        /// # Returns
277        /// A `Result` containing the configured [Rayon] strategy or a [rayon::ThreadPoolBuildError] if the pool cannot be
278        /// built.
279        fn create_strategy(
280            &self,
281            concurrency: NonZeroUsize,
282        ) -> Result<Rayon, ThreadPoolBuildError> {
283            self.create_thread_pool(concurrency).map(Rayon::with_pool)
284        }
285    }
286
287    /// Interface to register and encode metrics.
288    pub trait Metrics: Clone + Send + Sync + 'static {
289        /// Get the current label of the context.
290        fn label(&self) -> String;
291
292        /// Create a new instance of `Metrics` with the given label appended to the end
293        /// of the current `Metrics` label.
294        ///
295        /// This is commonly used to create a nested context for `register`.
296        ///
297        /// Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. It is not permitted for
298        /// any implementation to use `METRICS_PREFIX` as the start of a label (reserved for metrics for the runtime).
299        fn with_label(&self, label: &str) -> Self;
300
301        /// Create a new instance of `Metrics` with an additional attribute (key-value pair) applied
302        /// to all metrics registered in this context and any child contexts.
303        ///
304        /// Unlike [`Metrics::with_label`] which affects the metric name prefix, `with_attribute` adds
305        /// a key-value pair that appears as a separate dimension in the metric output. This is
306        /// useful for instrumenting n-ary data structures in a way that is easy to manage downstream.
307        ///
308        /// Keys must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. Values can be any string.
309        ///
310        /// # Labeling Children
311        ///
312        /// Attributes apply to the entire subtree of contexts. When you call `with_attribute`, the
313        /// label is automatically added to all metrics registered in that context and any child
314        /// contexts created via `with_label`:
315        ///
316        /// ```text
317        /// context
318        ///   |-- with_label("orchestrator")
319        ///         |-- with_attribute("epoch", "5")
320        ///               |-- counter: votes        -> orchestrator_votes{epoch="5"}
321        ///               |-- counter: proposals    -> orchestrator_proposals{epoch="5"}
322        ///               |-- with_label("engine")
323        ///                     |-- gauge: height   -> orchestrator_engine_height{epoch="5"}
324        /// ```
325        ///
326        /// This pattern avoids wrapping every metric in a `Family` and avoids polluting metric
327        /// names with dynamic values like `orchestrator_epoch_5_votes`.
328        ///
329        /// _Using attributes does not reduce cardinality (N epochs still means N time series).
330        /// Attributes just make metrics easier to query, filter, and aggregate._
331        ///
332        /// # Family Label Conflicts
333        ///
334        /// When using `Family` metrics, avoid using attribute keys that match the Family's label field names.
335        /// If a conflict occurs, the encoded output will contain duplicate labels (e.g., `{env="prod",env="staging"}`),
336        /// which is invalid Prometheus format and may cause scraping issues.
337        ///
338        /// ```ignore
339        /// #[derive(EncodeLabelSet)]
340        /// struct Labels { env: String }
341        ///
342        /// // BAD: attribute "env" conflicts with Family field "env"
343        /// let ctx = context.with_attribute("env", "prod");
344        /// let family: Family<Labels, Counter> = Family::default();
345        /// ctx.register("requests", "help", family);
346        /// // Produces invalid: requests_total{env="prod",env="staging"}
347        ///
348        /// // GOOD: use distinct names
349        /// let ctx = context.with_attribute("region", "us_east");
350        /// // Produces valid: requests_total{region="us_east",env="staging"}
351        /// ```
352        ///
353        /// # Example
354        ///
355        /// ```ignore
356        /// // Instead of creating epoch-specific metric names:
357        /// let ctx = context.with_label(&format!("consensus_engine_{}", epoch));
358        /// // Produces: consensus_engine_5_votes_total, consensus_engine_6_votes_total, ...
359        ///
360        /// // Use attributes to add epoch as a label dimension:
361        /// let ctx = context.with_label("consensus_engine").with_attribute("epoch", epoch);
362        /// // Produces: consensus_engine_votes_total{epoch="5"}, consensus_engine_votes_total{epoch="6"}, ...
363        /// ```
364        ///
365        /// Multiple attributes can be chained:
366        /// ```ignore
367        /// let ctx = context
368        ///     .with_label("engine")
369        ///     .with_attribute("region", "us_east")
370        ///     .with_attribute("instance", "i1");
371        /// // Produces: engine_requests_total{region="us_east",instance="i1"} 42
372        /// ```
373        ///
374        /// # Querying The Latest Attribute
375        ///
376        /// To query the latest attribute value dynamically, create a gauge to track the current value:
377        /// ```ignore
378        /// // Create a gauge to track the current epoch
379        /// let latest_epoch = Gauge::<i64>::default();
380        /// context.with_label("orchestrator").register("latest_epoch", "current epoch", latest_epoch.clone());
381        /// latest_epoch.set(current_epoch);
382        /// // Produces: orchestrator_latest_epoch 5
383        /// ```
384        ///
385        /// Then create a dashboard variable `$latest_epoch` with query `max(orchestrator_latest_epoch)`
386        /// and use it in panel queries: `consensus_engine_votes_total{epoch="$latest_epoch"}`
387        fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
388
389        /// Register a metric with the runtime.
390        ///
391        /// Any registered metric will include (as a prefix) the label of the current context.
392        ///
393        /// Names must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`.
394        fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
395
396        /// Encode all metrics into a buffer.
397        ///
398        /// To ensure downstream analytics tools work correctly, users must never duplicate metrics
399        /// (via the concatenation of nested `with_label` and `register` calls). This can be
400        /// avoided by using `with_label` and `with_attribute` to create new context instances
401        /// (ensures all context instances are namespaced).
402        fn encode(&self) -> String;
403
404        /// Create a scoped context for metrics with a bounded lifetime (e.g., per-epoch
405        /// consensus engines). All metrics registered through the returned context (and
406        /// child contexts via [`Metrics::with_label`]/[`Metrics::with_attribute`]) go into
407        /// a separate registry that is automatically removed when all clones of the scoped
408        /// context are dropped.
409        ///
410        /// If the context is already scoped, returns a clone with the same scope (scopes
411        /// nest by inheritance, not by creating new independent scopes).
412        ///
413        /// # Uniqueness
414        ///
415        /// Scoped metrics share the same global uniqueness constraint as
416        /// [`Metrics::register`]: each (prefixed_name, attributes) pair must be unique.
417        /// Callers should use [`Metrics::with_attribute`] to distinguish metrics across
418        /// scopes (e.g., an "epoch" attribute) rather than re-registering identical keys.
419        ///
420        /// # Example
421        ///
422        /// ```ignore
423        /// let scoped = context
424        ///     .with_label("engine")
425        ///     .with_attribute("epoch", epoch)
426        ///     .with_scope();
427        ///
428        /// // Register metrics into the scoped registry
429        /// let counter = Counter::default();
430        /// scoped.register("votes", "vote count", counter.clone());
431        ///
432        /// // Metrics are removed when all clones of `scoped` are dropped.
433        /// ```
434        fn with_scope(&self) -> Self;
435    }
436
437    /// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
438    ///
439    /// This is a convenience type alias for creating single-entity rate limiters.
440    /// For per-key rate limiting, use [KeyedRateLimiter].
441    pub type RateLimiter<C> = governor::RateLimiter<
442        governor::state::NotKeyed,
443        governor::state::InMemoryState,
444        C,
445        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
446    >;
447
448    /// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
449    ///
450    /// This is a convenience type alias for creating per-peer rate limiters
451    /// using governor's [HashMapStateStore].
452    ///
453    /// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
454    pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
455        K,
456        governor::state::keyed::HashMapStateStore<K>,
457        C,
458        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
459    >;
460
461    /// Interface that any task scheduler must implement to provide
462    /// time-based operations.
463    ///
464    /// It is necessary to mock time to provide deterministic execution
465    /// of arbitrary tasks.
466    pub trait Clock:
467        governor::clock::Clock<Instant = SystemTime>
468        + governor::clock::ReasonablyRealtime
469        + Clone
470        + Send
471        + Sync
472        + 'static
473    {
474        /// Returns the current time.
475        fn current(&self) -> SystemTime;
476
477        /// Sleep for the given duration.
478        fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
479
480        /// Sleep until the given deadline.
481        fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
482
483        /// Await a future with a timeout, returning `Error::Timeout` if it expires.
484        ///
485        /// # Examples
486        ///
487        /// ```
488        /// use std::time::Duration;
489        /// use commonware_runtime::{deterministic, Error, Runner, Clock};
490        ///
491        /// let executor = deterministic::Runner::default();
492        /// executor.start(|context| async move {
493        ///     match context
494        ///         .timeout(Duration::from_millis(100), async { 42 })
495        ///         .await
496        ///     {
497        ///         Ok(value) => assert_eq!(value, 42),
498        ///         Err(Error::Timeout) => panic!("should not timeout"),
499        ///         Err(e) => panic!("unexpected error: {:?}", e),
500        ///     }
501        /// });
502        /// ```
503        fn timeout<F, T>(
504            &self,
505            duration: Duration,
506            future: F,
507        ) -> impl Future<Output = Result<T, Error>> + Send + '_
508        where
509            F: Future<Output = T> + Send + 'static,
510            T: Send + 'static,
511        {
512            async move {
513                select! {
514                    result = future => Ok(result),
515                    _ = self.sleep(duration) => Err(Error::Timeout),
516                }
517            }
518        }
519    }
520
521    /// Syntactic sugar for the type of [Sink] used by a given [Network] N.
522    pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
523
524    /// Syntactic sugar for the type of [Stream] used by a given [Network] N.
525    pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
526
527    /// Syntactic sugar for the type of [Listener] used by a given [Network] N.
528    pub type ListenerOf<N> = <N as crate::Network>::Listener;
529
530    /// Interface that any runtime must implement to create
531    /// network connections.
532    pub trait Network: Clone + Send + Sync + 'static {
533        /// The type of [Listener] that's returned when binding to a socket.
534        /// Accepting a connection returns a [Sink] and [Stream] which are defined
535        /// by the [Listener] and used to send and receive data over the connection.
536        type Listener: Listener;
537
538        /// Bind to the given socket address.
539        fn bind(
540            &self,
541            socket: SocketAddr,
542        ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
543
544        /// Dial the given socket address.
545        fn dial(
546            &self,
547            socket: SocketAddr,
548        ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
549    }
550
551    /// Interface for DNS resolution.
552    pub trait Resolver: Clone + Send + Sync + 'static {
553        /// Resolve a hostname to IP addresses.
554        ///
555        /// Returns a list of IP addresses that the hostname resolves to.
556        fn resolve(
557            &self,
558            host: &str,
559        ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
560    }
561
562    /// Interface that any runtime must implement to handle
563    /// incoming network connections.
564    pub trait Listener: Sync + Send + 'static {
565        /// The type of [Sink] that's returned when accepting a connection.
566        /// This is used to send data to the remote connection.
567        type Sink: Sink;
568        /// The type of [Stream] that's returned when accepting a connection.
569        /// This is used to receive data from the remote connection.
570        type Stream: Stream;
571
572        /// Accept an incoming connection.
573        fn accept(
574            &mut self,
575        ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
576
577        /// Returns the local address of the listener.
578        fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
579    }
580
581    /// Interface that any runtime must implement to send
582    /// messages over a network connection.
583    pub trait Sink: Sync + Send + 'static {
584        /// Send a message to the sink.
585        ///
586        /// # Warning
587        ///
588        /// If the sink returns an error, part of the message may still be delivered.
589        fn send(
590            &mut self,
591            bufs: impl Into<IoBufs> + Send,
592        ) -> impl Future<Output = Result<(), Error>> + Send;
593    }
594
595    /// Interface that any runtime must implement to receive
596    /// messages over a network connection.
597    pub trait Stream: Sync + Send + 'static {
598        /// Receive exactly `len` bytes from the stream.
599        ///
600        /// The runtime allocates the buffer and returns it as `IoBufs`.
601        ///
602        /// # Warning
603        ///
604        /// If the stream returns an error, partially read data may be discarded.
605        fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
606
607        /// Peek at buffered data without consuming.
608        ///
609        /// Returns up to `max_len` bytes from the internal buffer, or an empty slice
610        /// if no data is currently buffered. This does not perform any I/O or block.
611        ///
612        /// This is useful e.g. for parsing length prefixes without committing to a read
613        /// or paying the cost of async.
614        fn peek(&self, max_len: usize) -> &[u8];
615    }
616
617    /// Interface to interact with storage.
618    ///
619    /// To support storage implementations that enable concurrent reads and
620    /// writes, blobs are responsible for maintaining synchronization.
621    ///
622    /// Storage can be backed by a local filesystem, cloud storage, etc.
623    ///
624    /// # Partition Names
625    ///
626    /// Partition names must be non-empty and contain only ASCII alphanumeric
627    /// characters, dashes (`-`), or underscores (`_`). Names containing other
628    /// characters (e.g., `/`, `.`, spaces) will return an error.
629    pub trait Storage: Clone + Send + Sync + 'static {
630        /// The readable/writeable storage buffer that can be opened by this Storage.
631        type Blob: Blob;
632
633        /// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
634        /// in the versions range. The blob version is omitted from the return value.
635        fn open(
636            &self,
637            partition: &str,
638            name: &[u8],
639        ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
640            async move {
641                let (blob, size, _) = self
642                    .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
643                    .await?;
644                Ok((blob, size))
645            }
646        }
647
648        /// Open an existing blob in a given partition or create a new one, returning
649        /// the blob and its length.
650        ///
651        /// Multiple instances of the same blob can be opened concurrently, however,
652        /// writing to the same blob concurrently may lead to undefined behavior.
653        ///
654        /// An Ok result indicates the blob is durably created (or already exists).
655        ///
656        /// # Versions
657        ///
658        /// Blobs are versioned. If the blob's version is not in `versions`, returns
659        /// [Error::BlobVersionMismatch].
660        ///
661        /// # Returns
662        ///
663        /// A tuple of (blob, logical_size, blob_version).
664        fn open_versioned(
665            &self,
666            partition: &str,
667            name: &[u8],
668            versions: std::ops::RangeInclusive<u16>,
669        ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
670
671        /// Remove a blob from a given partition.
672        ///
673        /// If no `name` is provided, the entire partition is removed.
674        ///
675        /// An Ok result indicates the blob is durably removed.
676        fn remove(
677            &self,
678            partition: &str,
679            name: Option<&[u8]>,
680        ) -> impl Future<Output = Result<(), Error>> + Send;
681
682        /// Return all blobs in a given partition.
683        fn scan(&self, partition: &str)
684            -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
685    }
686
687    /// Interface to read and write to a blob.
688    ///
689    /// To support blob implementations that enable concurrent reads and
690    /// writes, blobs are responsible for maintaining synchronization.
691    ///
692    /// Cloning a blob is similar to wrapping a single file descriptor in
693    /// a lock whereas opening a new blob (of the same name) is similar to
694    /// opening a new file descriptor. If multiple blobs are opened with the same
695    /// name, they are not expected to coordinate access to underlying storage
696    /// and writing to both is undefined behavior.
697    ///
698    /// When a blob is dropped, any unsynced changes may be discarded. Implementations
699    /// may attempt to sync during drop but errors will go unhandled. Call `sync`
700    /// before dropping to ensure all changes are durably persisted.
701    #[allow(clippy::len_without_is_empty)]
702    pub trait Blob: Clone + Send + Sync + 'static {
703        /// Read `len` bytes at `offset` into caller-provided buffer(s).
704        ///
705        /// The caller provides the buffer(s), and the implementation fills it with
706        /// exactly `len` bytes of data read from the blob starting at `offset`.
707        /// Returns the same buffer(s), filled with data.
708        ///
709        /// # Contract
710        ///
711        /// - The returned buffers reuse caller-provided storage, with exactly `len`
712        ///   bytes filled from `offset`.
713        /// - Caller-provided chunk layout is preserved.
714        ///
715        /// # Panics
716        ///
717        /// Panics if `len` exceeds the total capacity of `bufs`.
718        fn read_at_buf(
719            &self,
720            offset: u64,
721            len: usize,
722            bufs: impl Into<IoBufsMut> + Send,
723        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
724
725        /// Read `len` bytes at `offset`, returning a buffer(s) with exactly `len` bytes
726        /// of data read from the blob starting at `offset`.
727        ///
728        /// To reuse a buffer(s), use [`Blob::read_at_buf`].
729        fn read_at(
730            &self,
731            offset: u64,
732            len: usize,
733        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
734
735        /// Write `bufs` to the blob at the given offset.
736        fn write_at(
737            &self,
738            offset: u64,
739            bufs: impl Into<IoBufs> + Send,
740        ) -> impl Future<Output = Result<(), Error>> + Send;
741
742        /// Resize the blob to the given length.
743        ///
744        /// If the length is greater than the current length, the blob is extended with zeros.
745        /// If the length is less than the current length, the blob is resized.
746        fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
747
748        /// Ensure all pending data is durably persisted.
749        fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
750    }
751
752    /// Interface that any runtime must implement to provide buffer pools.
753    pub trait BufferPooler: Clone + Send + Sync + 'static {
754        /// Returns the network [BufferPool].
755        fn network_buffer_pool(&self) -> &BufferPool;
756
757        /// Returns the storage [BufferPool].
758        fn storage_buffer_pool(&self) -> &BufferPool;
759    }
760});
761stability_scope!(BETA, cfg(feature = "external") {
762    /// Interface that runtimes can implement to constrain the execution latency of a future.
763    pub trait Pacer: Clock + Clone + Send + Sync + 'static {
764        /// Defer completion of a future until a specified `latency` has elapsed. If the future is
765        /// not yet ready at the desired time of completion, the runtime will block until the future
766        /// is ready.
767        ///
768        /// In [crate::deterministic], this is used to ensure interactions with external systems can
769        /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
770        /// multiple runtimes to be tested with no code changes).
771        ///
772        /// # Setting Latency
773        ///
774        /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
775        /// the expected resolution latency of the future. To better explore the possible behavior of an
776        /// application, users can set latency to a randomly chosen value in the range of
777        /// `[expected latency / 2, expected latency * 2]`.
778        ///
779        /// # Warning
780        ///
781        /// Because `pace` blocks if the future is not ready, it is important that the future's completion
782        /// doesn't require anything in the current thread to complete (or else it will deadlock).
783        fn pace<'a, F, T>(
784            &'a self,
785            latency: Duration,
786            future: F,
787        ) -> impl Future<Output = T> + Send + 'a
788        where
789            F: Future<Output = T> + Send + 'a,
790            T: Send + 'a;
791    }
792
793    /// Extension trait that makes it more ergonomic to use [Pacer].
794    ///
795    /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
796    /// runtime should delay completion relative to the clock.
797    pub trait FutureExt: Future + Send + Sized {
798        /// Delay completion of the future until a specified `latency` on `pacer`.
799        fn pace<'a, E>(
800            self,
801            pacer: &'a E,
802            latency: Duration,
803        ) -> impl Future<Output = Self::Output> + Send + 'a
804        where
805            E: Pacer + 'a,
806            Self: Send + 'a,
807            Self::Output: Send + 'a,
808        {
809            pacer.pace(latency, self)
810        }
811    }
812
813    impl<F> FutureExt for F where F: Future + Send {}
814});
815
816#[cfg(test)]
817mod tests {
818    use super::*;
819    use crate::telemetry::traces::collector::TraceStorage;
820    use bytes::Bytes;
821    use commonware_macros::{select, test_collect_traces};
822    use commonware_utils::{
823        channel::{mpsc, oneshot},
824        sync::Mutex,
825        NZUsize,
826    };
827    use futures::{
828        future::{pending, ready},
829        join, pin_mut, FutureExt,
830    };
831    use prometheus_client::{
832        encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue},
833        metrics::{counter::Counter, family::Family},
834    };
835    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
836    use std::{
837        collections::HashMap,
838        net::{IpAddr, Ipv4Addr, Ipv6Addr},
839        pin::Pin,
840        str::FromStr,
841        sync::{
842            atomic::{AtomicU32, Ordering},
843            Arc,
844        },
845        task::{Context as TContext, Poll, Waker},
846    };
847    use tracing::{error, Level};
848    use utils::reschedule;
849
850    fn test_error_future<R: Runner>(runner: R) {
851        #[allow(clippy::unused_async)]
852        async fn error_future() -> Result<&'static str, &'static str> {
853            Err("An error occurred")
854        }
855        let result = runner.start(|_| error_future());
856        assert_eq!(result, Err("An error occurred"));
857    }
858
859    fn test_clock_sleep<R: Runner>(runner: R)
860    where
861        R::Context: Spawner + Clock,
862    {
863        runner.start(|context| async move {
864            // Capture initial time
865            let start = context.current();
866            let sleep_duration = Duration::from_millis(10);
867            context.sleep(sleep_duration).await;
868
869            // After run, time should have advanced
870            let end = context.current();
871            assert!(end.duration_since(start).unwrap() >= sleep_duration);
872        });
873    }
874
875    fn test_clock_sleep_until<R: Runner>(runner: R)
876    where
877        R::Context: Spawner + Clock + Metrics,
878    {
879        runner.start(|context| async move {
880            // Trigger sleep
881            let now = context.current();
882            context.sleep_until(now + Duration::from_millis(100)).await;
883
884            // Ensure slept duration has elapsed
885            let elapsed = now.elapsed().unwrap();
886            assert!(elapsed >= Duration::from_millis(100));
887        });
888    }
889
890    fn test_clock_timeout<R: Runner>(runner: R)
891    where
892        R::Context: Spawner + Clock,
893    {
894        runner.start(|context| async move {
895            // Future completes before timeout
896            let result = context
897                .timeout(Duration::from_millis(100), async { "success" })
898                .await;
899            assert_eq!(result.unwrap(), "success");
900
901            // Future exceeds timeout duration
902            let result = context
903                .timeout(Duration::from_millis(50), pending::<()>())
904                .await;
905            assert!(matches!(result, Err(Error::Timeout)));
906
907            // Future completes within timeout
908            let result = context
909                .timeout(
910                    Duration::from_millis(100),
911                    context.sleep(Duration::from_millis(50)),
912                )
913                .await;
914            assert!(result.is_ok());
915        });
916    }
917
918    fn test_root_finishes<R: Runner>(runner: R)
919    where
920        R::Context: Spawner,
921    {
922        runner.start(|context| async move {
923            context.spawn(|_| async move {
924                loop {
925                    reschedule().await;
926                }
927            });
928        });
929    }
930
931    fn test_spawn_after_abort<R>(runner: R)
932    where
933        R: Runner,
934        R::Context: Spawner + Clone,
935    {
936        runner.start(|context| async move {
937            // Create a child context
938            let child = context.clone();
939
940            // Spawn parent and abort
941            let parent_handle = context.spawn(move |_| async move {
942                pending::<()>().await;
943            });
944            parent_handle.abort();
945
946            // Spawn child and ensure it aborts
947            let child_handle = child.spawn(move |_| async move {
948                pending::<()>().await;
949            });
950            assert!(matches!(child_handle.await, Err(Error::Closed)));
951        });
952    }
953
954    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
955    where
956        R::Context: Spawner,
957    {
958        runner.start(|context| async move {
959            let context = if dedicated {
960                assert!(!blocking);
961                context.dedicated()
962            } else {
963                context.shared(blocking)
964            };
965
966            let handle = context.spawn(|_| async move {
967                loop {
968                    reschedule().await;
969                }
970            });
971            handle.abort();
972            assert!(matches!(handle.await, Err(Error::Closed)));
973        });
974    }
975
976    fn test_panic_aborts_root<R: Runner>(runner: R) {
977        let result: Result<(), Error> = runner.start(|_| async move {
978            panic!("blah");
979        });
980        result.unwrap_err();
981    }
982
983    fn test_panic_aborts_spawn<R: Runner>(runner: R)
984    where
985        R::Context: Spawner + Clock,
986    {
987        runner.start(|context| async move {
988            context.clone().spawn(|_| async move {
989                panic!("blah");
990            });
991
992            // Loop until panic
993            loop {
994                context.sleep(Duration::from_millis(100)).await;
995            }
996        });
997    }
998
999    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1000    where
1001        R::Context: Spawner + Clock,
1002    {
1003        let result: Result<(), Error> = runner.start(|context| async move {
1004            let result = context.clone().spawn(|_| async move {
1005                panic!("blah");
1006            });
1007            result.await
1008        });
1009        assert!(matches!(result, Err(Error::Exited)));
1010    }
1011
1012    fn test_multiple_panics<R: Runner>(runner: R)
1013    where
1014        R::Context: Spawner + Clock,
1015    {
1016        runner.start(|context| async move {
1017            context.clone().spawn(|_| async move {
1018                panic!("boom 1");
1019            });
1020            context.clone().spawn(|_| async move {
1021                panic!("boom 2");
1022            });
1023            context.clone().spawn(|_| async move {
1024                panic!("boom 3");
1025            });
1026
1027            // Loop until panic
1028            loop {
1029                context.sleep(Duration::from_millis(100)).await;
1030            }
1031        });
1032    }
1033
1034    fn test_multiple_panics_caught<R: Runner>(runner: R)
1035    where
1036        R::Context: Spawner + Clock,
1037    {
1038        let (res1, res2, res3) = runner.start(|context| async move {
1039            let handle1 = context.clone().spawn(|_| async move {
1040                panic!("boom 1");
1041            });
1042            let handle2 = context.clone().spawn(|_| async move {
1043                panic!("boom 2");
1044            });
1045            let handle3 = context.clone().spawn(|_| async move {
1046                panic!("boom 3");
1047            });
1048
1049            join!(handle1, handle2, handle3)
1050        });
1051        assert!(matches!(res1, Err(Error::Exited)));
1052        assert!(matches!(res2, Err(Error::Exited)));
1053        assert!(matches!(res3, Err(Error::Exited)));
1054    }
1055
1056    fn test_select<R: Runner>(runner: R) {
1057        runner.start(|_| async move {
1058            // Test first branch
1059            let output = Mutex::new(0);
1060            select! {
1061                v1 = ready(1) => {
1062                    *output.lock() = v1;
1063                },
1064                v2 = ready(2) => {
1065                    *output.lock() = v2;
1066                },
1067            };
1068            assert_eq!(*output.lock(), 1);
1069
1070            // Test second branch
1071            select! {
1072                v1 = std::future::pending::<i32>() => {
1073                    *output.lock() = v1;
1074                },
1075                v2 = ready(2) => {
1076                    *output.lock() = v2;
1077                },
1078            };
1079            assert_eq!(*output.lock(), 2);
1080        });
1081    }
1082
1083    /// Ensure future fusing works as expected.
1084    fn test_select_loop<R: Runner>(runner: R)
1085    where
1086        R::Context: Clock,
1087    {
1088        runner.start(|context| async move {
1089            // Should hit timeout
1090            let (sender, mut receiver) = mpsc::unbounded_channel();
1091            for _ in 0..2 {
1092                select! {
1093                    v = receiver.recv() => {
1094                        panic!("unexpected value: {v:?}");
1095                    },
1096                    _ = context.sleep(Duration::from_millis(100)) => {
1097                        continue;
1098                    },
1099                };
1100            }
1101
1102            // Populate channel
1103            sender.send(0).unwrap();
1104            sender.send(1).unwrap();
1105
1106            // Prefer not reading channel without losing messages
1107            select! {
1108                _ = async {} => {
1109                    // Skip reading from channel even though populated
1110                },
1111                v = receiver.recv() => {
1112                    panic!("unexpected value: {v:?}");
1113                },
1114            };
1115
1116            // Process messages
1117            for i in 0..2 {
1118                select! {
1119                    _ = context.sleep(Duration::from_millis(100)) => {
1120                        panic!("timeout");
1121                    },
1122                    v = receiver.recv() => {
1123                        assert_eq!(v.unwrap(), i);
1124                    },
1125                };
1126            }
1127        });
1128    }
1129
1130    fn test_storage_operations<R: Runner>(runner: R)
1131    where
1132        R::Context: Storage,
1133    {
1134        runner.start(|context| async move {
1135            let partition = "test_partition";
1136            let name = b"test_blob";
1137
1138            // Open a new blob
1139            let (blob, size) = context
1140                .open(partition, name)
1141                .await
1142                .expect("Failed to open blob");
1143            assert_eq!(size, 0, "new blob should have size 0");
1144
1145            // Write data to the blob
1146            let data = b"Hello, Storage!";
1147            blob.write_at(0, data)
1148                .await
1149                .expect("Failed to write to blob");
1150
1151            // Sync the blob
1152            blob.sync().await.expect("Failed to sync blob");
1153
1154            // Read data from the blob
1155            let read = blob
1156                .read_at(0, data.len())
1157                .await
1158                .expect("Failed to read from blob");
1159            assert_eq!(read.coalesce(), data);
1160
1161            // Sync the blob
1162            blob.sync().await.expect("Failed to sync blob");
1163
1164            // Scan blobs in the partition
1165            let blobs = context
1166                .scan(partition)
1167                .await
1168                .expect("Failed to scan partition");
1169            assert!(blobs.contains(&name.to_vec()));
1170
1171            // Reopen the blob
1172            let (blob, len) = context
1173                .open(partition, name)
1174                .await
1175                .expect("Failed to reopen blob");
1176            assert_eq!(len, data.len() as u64);
1177
1178            // Read data part of message back
1179            let read = blob.read_at(7, 7).await.expect("Failed to read data");
1180            assert_eq!(read.coalesce(), b"Storage");
1181
1182            // Sync the blob
1183            blob.sync().await.expect("Failed to sync blob");
1184
1185            // Remove the blob
1186            context
1187                .remove(partition, Some(name))
1188                .await
1189                .expect("Failed to remove blob");
1190
1191            // Ensure the blob is removed
1192            let blobs = context
1193                .scan(partition)
1194                .await
1195                .expect("Failed to scan partition");
1196            assert!(!blobs.contains(&name.to_vec()));
1197
1198            // Remove the partition
1199            context
1200                .remove(partition, None)
1201                .await
1202                .expect("Failed to remove partition");
1203
1204            // Scan the partition
1205            let result = context.scan(partition).await;
1206            assert!(matches!(result, Err(Error::PartitionMissing(_))));
1207        });
1208    }
1209
1210    fn test_blob_read_write<R: Runner>(runner: R)
1211    where
1212        R::Context: Storage,
1213    {
1214        runner.start(|context| async move {
1215            let partition = "test_partition";
1216            let name = b"test_blob_rw";
1217
1218            // Open a new blob
1219            let (blob, _) = context
1220                .open(partition, name)
1221                .await
1222                .expect("Failed to open blob");
1223
1224            // Write data at different offsets
1225            let data1 = b"Hello";
1226            let data2 = b"World";
1227            blob.write_at(0, data1)
1228                .await
1229                .expect("Failed to write data1");
1230            blob.write_at(5, data2)
1231                .await
1232                .expect("Failed to write data2");
1233
1234            // Read data back
1235            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1236            let read = read.coalesce();
1237            assert_eq!(&read.as_ref()[..5], data1);
1238            assert_eq!(&read.as_ref()[5..], data2);
1239
1240            // Read past end of blob
1241            let result = blob.read_at(10, 10).await;
1242            assert!(result.is_err());
1243
1244            // Rewrite data without affecting length
1245            let data3 = b"Store";
1246            blob.write_at(5, data3)
1247                .await
1248                .expect("Failed to write data3");
1249
1250            // Read data back
1251            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1252            let read = read.coalesce();
1253            assert_eq!(&read.as_ref()[..5], data1);
1254            assert_eq!(&read.as_ref()[5..], data3);
1255
1256            // Read past end of blob
1257            let result = blob.read_at(10, 10).await;
1258            assert!(result.is_err());
1259        });
1260    }
1261
1262    fn test_blob_resize<R: Runner>(runner: R)
1263    where
1264        R::Context: Storage,
1265    {
1266        runner.start(|context| async move {
1267            let partition = "test_partition_resize";
1268            let name = b"test_blob_resize";
1269
1270            // Open and write to a new blob
1271            let (blob, _) = context
1272                .open(partition, name)
1273                .await
1274                .expect("Failed to open blob");
1275
1276            let data = b"some data";
1277            blob.write_at(0, data.to_vec())
1278                .await
1279                .expect("Failed to write");
1280            blob.sync().await.expect("Failed to sync after write");
1281
1282            // Re-open and check length
1283            let (blob, len) = context.open(partition, name).await.unwrap();
1284            assert_eq!(len, data.len() as u64);
1285
1286            // Resize to extend the file
1287            let new_len = (data.len() as u64) * 2;
1288            blob.resize(new_len)
1289                .await
1290                .expect("Failed to resize to extend");
1291            blob.sync().await.expect("Failed to sync after resize");
1292
1293            // Re-open and check length again
1294            let (blob, len) = context.open(partition, name).await.unwrap();
1295            assert_eq!(len, new_len);
1296
1297            // Read original data
1298            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1299            assert_eq!(read_buf.coalesce(), data);
1300
1301            // Read extended part (should be zeros)
1302            let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1303            assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1304
1305            // Truncate the blob
1306            blob.resize(data.len() as u64).await.unwrap();
1307            blob.sync().await.unwrap();
1308
1309            // Reopen to check truncation
1310            let (blob, size) = context.open(partition, name).await.unwrap();
1311            assert_eq!(size, data.len() as u64);
1312
1313            // Read truncated data
1314            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1315            assert_eq!(read_buf.coalesce(), data);
1316            blob.sync().await.unwrap();
1317        });
1318    }
1319
1320    fn test_many_partition_read_write<R: Runner>(runner: R)
1321    where
1322        R::Context: Storage,
1323    {
1324        runner.start(|context| async move {
1325            let partitions = ["partition1", "partition2", "partition3"];
1326            let name = b"test_blob_rw";
1327            let data1 = b"Hello";
1328            let data2 = b"World";
1329
1330            for (additional, partition) in partitions.iter().enumerate() {
1331                // Open a new blob
1332                let (blob, _) = context
1333                    .open(partition, name)
1334                    .await
1335                    .expect("Failed to open blob");
1336
1337                // Write data at different offsets
1338                blob.write_at(0, data1)
1339                    .await
1340                    .expect("Failed to write data1");
1341                blob.write_at(5 + additional as u64, data2)
1342                    .await
1343                    .expect("Failed to write data2");
1344
1345                // Sync the blob
1346                blob.sync().await.expect("Failed to sync blob");
1347            }
1348
1349            for (additional, partition) in partitions.iter().enumerate() {
1350                // Open a new blob
1351                let (blob, len) = context
1352                    .open(partition, name)
1353                    .await
1354                    .expect("Failed to open blob");
1355                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1356
1357                // Read data back
1358                let read = blob
1359                    .read_at(0, 10 + additional)
1360                    .await
1361                    .expect("Failed to read data");
1362                let read = read.coalesce();
1363                assert_eq!(&read.as_ref()[..5], b"Hello");
1364                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1365            }
1366        });
1367    }
1368
1369    fn test_blob_read_past_length<R: Runner>(runner: R)
1370    where
1371        R::Context: Storage,
1372    {
1373        runner.start(|context| async move {
1374            let partition = "test_partition";
1375            let name = b"test_blob_rw";
1376
1377            // Open a new blob
1378            let (blob, _) = context
1379                .open(partition, name)
1380                .await
1381                .expect("Failed to open blob");
1382
1383            // Read data past file length (empty file)
1384            let result = blob.read_at(0, 10).await;
1385            assert!(result.is_err());
1386
1387            // Write data to the blob
1388            let data = b"Hello, Storage!".to_vec();
1389            blob.write_at(0, data)
1390                .await
1391                .expect("Failed to write to blob");
1392
1393            // Read data past file length (non-empty file)
1394            let result = blob.read_at(0, 20).await;
1395            assert!(result.is_err());
1396        })
1397    }
1398
1399    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1400    where
1401        R::Context: Spawner + Storage + Metrics,
1402    {
1403        runner.start(|context| async move {
1404            let partition = "test_partition";
1405            let name = b"test_blob_rw";
1406
1407            // Open a new blob
1408            let (blob, _) = context
1409                .open(partition, name)
1410                .await
1411                .expect("Failed to open blob");
1412
1413            // Write data to the blob
1414            let data = b"Hello, Storage!";
1415            blob.write_at(0, data)
1416                .await
1417                .expect("Failed to write to blob");
1418
1419            // Sync the blob
1420            blob.sync().await.expect("Failed to sync blob");
1421
1422            // Read data from the blob in clone
1423            let check1 = context.with_label("check1").spawn({
1424                let blob = blob.clone();
1425                let data_len = data.len();
1426                move |_| async move {
1427                    let read = blob
1428                        .read_at(0, data_len)
1429                        .await
1430                        .expect("Failed to read from blob");
1431                    assert_eq!(read.coalesce(), data);
1432                }
1433            });
1434            let check2 = context.with_label("check2").spawn({
1435                let blob = blob.clone();
1436                let data_len = data.len();
1437                move |_| async move {
1438                    let read = blob
1439                        .read_at(0, data_len)
1440                        .await
1441                        .expect("Failed to read from blob");
1442                    assert_eq!(read.coalesce(), data);
1443                }
1444            });
1445
1446            // Wait for both reads to complete
1447            let result = join!(check1, check2);
1448            assert!(result.0.is_ok());
1449            assert!(result.1.is_ok());
1450
1451            // Read data from the blob
1452            let read = blob
1453                .read_at(0, data.len())
1454                .await
1455                .expect("Failed to read from blob");
1456            assert_eq!(read.coalesce(), data);
1457
1458            // Drop the blob
1459            drop(blob);
1460
1461            // Ensure no blobs still open
1462            let buffer = context.encode();
1463            assert!(buffer.contains("open_blobs 0"));
1464        });
1465    }
1466
1467    fn test_shutdown<R: Runner>(runner: R)
1468    where
1469        R::Context: Spawner + Metrics + Clock,
1470    {
1471        let kill = 9;
1472        runner.start(|context| async move {
1473            // Spawn a task that waits for signal
1474            let before = context
1475                .with_label("before")
1476                .spawn(move |context| async move {
1477                    let mut signal = context.stopped();
1478                    let value = (&mut signal).await.unwrap();
1479                    assert_eq!(value, kill);
1480                    drop(signal);
1481                });
1482
1483            // Signal the tasks and wait for them to stop
1484            let result = context.clone().stop(kill, None).await;
1485            assert!(result.is_ok());
1486
1487            // Spawn a task after stop is called
1488            let after = context
1489                .with_label("after")
1490                .spawn(move |context| async move {
1491                    // A call to `stopped()` after `stop()` resolves immediately
1492                    let value = context.stopped().await.unwrap();
1493                    assert_eq!(value, kill);
1494                });
1495
1496            // Ensure both tasks complete
1497            let result = join!(before, after);
1498            assert!(result.0.is_ok());
1499            assert!(result.1.is_ok());
1500        });
1501    }
1502
1503    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1504    where
1505        R::Context: Spawner + Metrics + Clock,
1506    {
1507        let kill = 42;
1508        runner.start(|context| async move {
1509            let (started_tx, mut started_rx) = mpsc::channel(3);
1510            let counter = Arc::new(AtomicU32::new(0));
1511
1512            // Spawn 3 tasks that do cleanup work after receiving stop signal
1513            // and increment a shared counter
1514            let task = |cleanup_duration: Duration| {
1515                let context = context.clone();
1516                let counter = counter.clone();
1517                let started_tx = started_tx.clone();
1518                context.spawn(move |context| async move {
1519                    // Wait for signal to be acquired
1520                    let mut signal = context.stopped();
1521                    started_tx.send(()).await.unwrap();
1522
1523                    // Increment once killed
1524                    let value = (&mut signal).await.unwrap();
1525                    assert_eq!(value, kill);
1526                    context.sleep(cleanup_duration).await;
1527                    counter.fetch_add(1, Ordering::SeqCst);
1528
1529                    // Wait to drop signal until work has been done
1530                    drop(signal);
1531                })
1532            };
1533
1534            let task1 = task(Duration::from_millis(10));
1535            let task2 = task(Duration::from_millis(20));
1536            let task3 = task(Duration::from_millis(30));
1537
1538            // Give tasks time to start
1539            for _ in 0..3 {
1540                started_rx.recv().await.unwrap();
1541            }
1542
1543            // Stop and verify all cleanup completed
1544            context.stop(kill, None).await.unwrap();
1545            assert_eq!(counter.load(Ordering::SeqCst), 3);
1546
1547            // Ensure all tasks completed
1548            let result = join!(task1, task2, task3);
1549            assert!(result.0.is_ok());
1550            assert!(result.1.is_ok());
1551            assert!(result.2.is_ok());
1552        });
1553    }
1554
1555    fn test_shutdown_timeout<R: Runner>(runner: R)
1556    where
1557        R::Context: Spawner + Metrics + Clock,
1558    {
1559        let kill = 42;
1560        runner.start(|context| async move {
1561            // Setup startup coordinator
1562            let (started_tx, started_rx) = oneshot::channel();
1563
1564            // Spawn a task that never completes its cleanup
1565            context.clone().spawn(move |context| async move {
1566                let signal = context.stopped();
1567                started_tx.send(()).unwrap();
1568                pending::<()>().await;
1569                signal.await.unwrap();
1570            });
1571
1572            // Try to stop with a timeout
1573            started_rx.await.unwrap();
1574            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1575
1576            // Assert that we got a timeout error
1577            assert!(matches!(result, Err(Error::Timeout)));
1578        });
1579    }
1580
1581    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1582    where
1583        R::Context: Spawner + Metrics + Clock,
1584    {
1585        let kill1 = 42;
1586        let kill2 = 43;
1587
1588        runner.start(|context| async move {
1589            let (started_tx, started_rx) = oneshot::channel();
1590            let counter = Arc::new(AtomicU32::new(0));
1591
1592            // Spawn a task that delays completion to test timing
1593            let task = context.with_label("blocking_task").spawn({
1594                let counter = counter.clone();
1595                move |context| async move {
1596                    // Wait for signal to be acquired
1597                    let mut signal = context.stopped();
1598                    started_tx.send(()).unwrap();
1599
1600                    // Wait for signal to be resolved
1601                    let value = (&mut signal).await.unwrap();
1602                    assert_eq!(value, kill1);
1603                    context.sleep(Duration::from_millis(50)).await;
1604
1605                    // Increment counter
1606                    counter.fetch_add(1, Ordering::SeqCst);
1607                    drop(signal);
1608                }
1609            });
1610
1611            // Give task time to start
1612            started_rx.await.unwrap();
1613
1614            // Issue two separate stop calls
1615            // The second stop call uses a different stop value that should be ignored
1616            let stop_task1 = context.clone().stop(kill1, None);
1617            pin_mut!(stop_task1);
1618            let stop_task2 = context.clone().stop(kill2, None);
1619            pin_mut!(stop_task2);
1620
1621            // Both of them should be awaiting completion
1622            assert!(stop_task1.as_mut().now_or_never().is_none());
1623            assert!(stop_task2.as_mut().now_or_never().is_none());
1624
1625            // Wait for both stop calls to complete
1626            assert!(stop_task1.await.is_ok());
1627            assert!(stop_task2.await.is_ok());
1628
1629            // Verify first stop value wins
1630            let sig = context.stopped().await;
1631            assert_eq!(sig.unwrap(), kill1);
1632
1633            // Wait for blocking task to complete
1634            let result = task.await;
1635            assert!(result.is_ok());
1636            assert_eq!(counter.load(Ordering::SeqCst), 1);
1637
1638            // Post-completion stop should return immediately
1639            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1640        });
1641    }
1642
1643    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1644    where
1645        R::Context: Spawner + Metrics,
1646    {
1647        runner.start(|context| async move {
1648            // Spawn a task that waits for signal
1649            context
1650                .with_label("before")
1651                .spawn(move |context| async move {
1652                    let mut signal = context.stopped();
1653                    let value = (&mut signal).await.unwrap();
1654
1655                    // We should never reach this point
1656                    assert_eq!(value, 42);
1657                    drop(signal);
1658                });
1659
1660            // Ensure waker is registered
1661            reschedule().await;
1662        });
1663    }
1664
1665    fn test_spawn_dedicated<R: Runner>(runner: R)
1666    where
1667        R::Context: Spawner,
1668    {
1669        runner.start(|context| async move {
1670            let handle = context.dedicated().spawn(|_| async move { 42 });
1671            assert!(matches!(handle.await, Ok(42)));
1672        });
1673    }
1674
1675    fn test_spawn<R: Runner>(runner: R)
1676    where
1677        R::Context: Spawner + Clock,
1678    {
1679        runner.start(|context| async move {
1680            let child_handle = Arc::new(Mutex::new(None));
1681            let child_handle2 = child_handle.clone();
1682
1683            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1684            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1685            let parent_handle = context.spawn(move |context| async move {
1686                // Spawn child that completes immediately
1687                let handle = context.spawn(|_| async {});
1688
1689                // Store child handle so we can test it later
1690                *child_handle2.lock() = Some(handle);
1691
1692                parent_initialized_tx.send(()).unwrap();
1693
1694                // Parent task completes
1695                parent_complete_rx.await.unwrap();
1696            });
1697
1698            // Wait for parent task to spawn the children
1699            parent_initialized_rx.await.unwrap();
1700
1701            // Child task completes successfully
1702            let child_handle = child_handle.lock().take().unwrap();
1703            assert!(child_handle.await.is_ok());
1704
1705            // Complete the parent task
1706            parent_complete_tx.send(()).unwrap();
1707
1708            // Wait for parent task to complete successfully
1709            assert!(parent_handle.await.is_ok());
1710        });
1711    }
1712
1713    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1714    where
1715        R::Context: Spawner + Clock,
1716    {
1717        runner.start(|context| async move {
1718            let child_handle = Arc::new(Mutex::new(None));
1719            let child_handle2 = child_handle.clone();
1720
1721            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1722            let parent_handle = context.spawn(move |context| async move {
1723                // Spawn child task that hangs forever, should be aborted when parent aborts
1724                let handle = context.spawn(|_| pending::<()>());
1725
1726                // Store child task handle so we can test it later
1727                *child_handle2.lock() = Some(handle);
1728
1729                parent_initialized_tx.send(()).unwrap();
1730
1731                // Parent task runs until aborted
1732                pending::<()>().await
1733            });
1734
1735            // Wait for parent task to spawn the children
1736            parent_initialized_rx.await.unwrap();
1737
1738            // Abort parent task
1739            parent_handle.abort();
1740            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1741
1742            // Child task should also resolve with error since its parent aborted
1743            let child_handle = child_handle.lock().take().unwrap();
1744            assert!(matches!(child_handle.await, Err(Error::Closed)));
1745        });
1746    }
1747
1748    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1749    where
1750        R::Context: Spawner + Clock,
1751    {
1752        runner.start(|context| async move {
1753            let child_handle = Arc::new(Mutex::new(None));
1754            let child_handle2 = child_handle.clone();
1755
1756            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1757            let parent_handle = context.spawn(move |context| async move {
1758                // Spawn child task that hangs forever, should be aborted when parent completes
1759                let handle = context.spawn(|_| pending::<()>());
1760
1761                // Store child task handle so we can test it later
1762                *child_handle2.lock() = Some(handle);
1763
1764                // Parent task completes
1765                parent_complete_rx.await.unwrap();
1766            });
1767
1768            // Fire parent completion
1769            parent_complete_tx.send(()).unwrap();
1770
1771            // Wait for parent task to complete
1772            assert!(parent_handle.await.is_ok());
1773
1774            // Child task should resolve with error since its parent has completed
1775            let child_handle = child_handle.lock().take().unwrap();
1776            assert!(matches!(child_handle.await, Err(Error::Closed)));
1777        });
1778    }
1779
1780    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1781    where
1782        R::Context: Spawner + Clock,
1783    {
1784        runner.start(|context| async move {
1785            // We create the following tree of tasks. All tasks will run
1786            // indefinitely (until aborted).
1787            //
1788            //          root
1789            //     /     |     \
1790            //    /      |      \
1791            //   c0      c1      c2
1792            //  /  \    /  \    /  \
1793            // g0  g1  g2  g3  g4  g5
1794            let c0 = context.clone();
1795            let g0 = c0.clone();
1796            let g1 = c0.clone();
1797            let c1 = context.clone();
1798            let g2 = c1.clone();
1799            let g3 = c1.clone();
1800            let c2 = context.clone();
1801            let g4 = c2.clone();
1802            let g5 = c2.clone();
1803
1804            // Spawn tasks
1805            let handles = Arc::new(Mutex::new(Vec::new()));
1806            let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1807            let root_task = context.spawn({
1808                let handles = handles.clone();
1809                move |_| async move {
1810                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1811                    {
1812                        let handle = context.spawn({
1813                            let handles = handles.clone();
1814                            let initialized_tx = initialized_tx.clone();
1815                            move |_| async move {
1816                                for grandchild in grandchildren {
1817                                    let handle = grandchild.spawn(|_| async {
1818                                        pending::<()>().await;
1819                                    });
1820                                    handles.lock().push(handle);
1821                                    initialized_tx.send(()).await.unwrap();
1822                                }
1823
1824                                pending::<()>().await;
1825                            }
1826                        });
1827                        handles.lock().push(handle);
1828                        initialized_tx.send(()).await.unwrap();
1829                    }
1830
1831                    pending::<()>().await;
1832                }
1833            });
1834
1835            // Wait for tasks to initialize
1836            for _ in 0..9 {
1837                initialized_rx.recv().await.unwrap();
1838            }
1839
1840            // Verify we have all 9 handles (3 children + 6 grandchildren)
1841            assert_eq!(handles.lock().len(), 9);
1842
1843            // Abort root task
1844            root_task.abort();
1845            assert!(matches!(root_task.await, Err(Error::Closed)));
1846
1847            // All handles should resolve with error due to cascading abort
1848            let handles = handles.lock().drain(..).collect::<Vec<_>>();
1849            for handle in handles {
1850                assert!(matches!(handle.await, Err(Error::Closed)));
1851            }
1852        });
1853    }
1854
1855    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1856    where
1857        R::Context: Spawner + Clock,
1858    {
1859        runner.start(|context| async move {
1860            let (child_started_tx, child_started_rx) = oneshot::channel();
1861            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1862            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1863            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1864            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1865            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1866            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1867
1868            let parent = context.spawn(move |context| async move {
1869                // Spawn a child task
1870                let child_handle = context.clone().spawn(|_| async move {
1871                    child_started_tx.send(()).unwrap();
1872                    // Wait for signal to complete
1873                    child_complete_rx.await.unwrap();
1874                });
1875                assert!(
1876                    child_handle_tx.send(child_handle).is_ok(),
1877                    "child handle receiver dropped"
1878                );
1879
1880                // Spawn an independent sibling task
1881                let sibling_handle = context.clone().spawn(move |_| async move {
1882                    sibling_started_tx.send(()).unwrap();
1883                    // Wait for signal to complete
1884                    sibling_complete_rx.await.unwrap();
1885                });
1886                assert!(
1887                    sibling_handle_tx.send(sibling_handle).is_ok(),
1888                    "sibling handle receiver dropped"
1889                );
1890
1891                // Wait for signal to complete
1892                parent_complete_rx.await.unwrap();
1893            });
1894
1895            // Wait for both to start
1896            child_started_rx.await.unwrap();
1897            sibling_started_rx.await.unwrap();
1898
1899            // Kill the sibling
1900            sibling_complete_tx.send(()).unwrap();
1901            assert!(sibling_handle_rx.await.is_ok());
1902
1903            // The child task should still be alive
1904            child_complete_tx.send(()).unwrap();
1905            assert!(child_handle_rx.await.is_ok());
1906
1907            // As well as the parent
1908            parent_complete_tx.send(()).unwrap();
1909            assert!(parent.await.is_ok());
1910        });
1911    }
1912
1913    fn test_spawn_clone_chain<R: Runner>(runner: R)
1914    where
1915        R::Context: Spawner + Clock,
1916    {
1917        runner.start(|context| async move {
1918            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1919            let (child_started_tx, child_started_rx) = oneshot::channel();
1920            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1921            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1922            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1923
1924            let parent = context.clone().spawn({
1925                move |context| async move {
1926                    let child = context.clone().spawn({
1927                        move |context| async move {
1928                            let grandchild = context.clone().spawn({
1929                                move |_| async move {
1930                                    grandchild_started_tx.send(()).unwrap();
1931                                    pending::<()>().await;
1932                                }
1933                            });
1934                            assert!(
1935                                grandchild_handle_tx.send(grandchild).is_ok(),
1936                                "grandchild handle receiver dropped"
1937                            );
1938                            child_started_tx.send(()).unwrap();
1939                            pending::<()>().await;
1940                        }
1941                    });
1942                    assert!(
1943                        child_handle_tx.send(child).is_ok(),
1944                        "child handle receiver dropped"
1945                    );
1946                    parent_started_tx.send(()).unwrap();
1947                    pending::<()>().await;
1948                }
1949            });
1950
1951            parent_started_rx.await.unwrap();
1952            child_started_rx.await.unwrap();
1953            grandchild_started_rx.await.unwrap();
1954
1955            let child_handle = child_handle_rx.await.unwrap();
1956            let grandchild_handle = grandchild_handle_rx.await.unwrap();
1957
1958            parent.abort();
1959            assert!(parent.await.is_err());
1960
1961            assert!(child_handle.await.is_err());
1962            assert!(grandchild_handle.await.is_err());
1963        });
1964    }
1965
1966    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1967    where
1968        R::Context: Spawner + Clock,
1969    {
1970        runner.start(|context| async move {
1971            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1972            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1973
1974            let parent = context.clone().spawn({
1975                move |context| async move {
1976                    let clone1 = context.clone();
1977                    let clone2 = clone1.clone();
1978                    let clone3 = clone2.clone();
1979
1980                    let leaf = clone3.clone().spawn({
1981                        move |_| async move {
1982                            leaf_started_tx.send(()).unwrap();
1983                            pending::<()>().await;
1984                        }
1985                    });
1986
1987                    leaf_handle_tx
1988                        .send(leaf)
1989                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1990                    pending::<()>().await;
1991                }
1992            });
1993
1994            leaf_started_rx.await.unwrap();
1995            let leaf_handle = leaf_handle_rx.await.unwrap();
1996
1997            parent.abort();
1998            assert!(parent.await.is_err());
1999            assert!(leaf_handle.await.is_err());
2000        });
2001    }
2002
2003    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2004    where
2005        R::Context: Spawner,
2006    {
2007        runner.start(|context| async move {
2008            let context = if dedicated {
2009                context.dedicated()
2010            } else {
2011                context.shared(true)
2012            };
2013
2014            let handle = context.spawn(|_| async move { 42 });
2015            let result = handle.await;
2016            assert!(matches!(result, Ok(42)));
2017        });
2018    }
2019
2020    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2021    where
2022        R::Context: Spawner + Clock,
2023    {
2024        runner.start(|context| async move {
2025            let context = if dedicated {
2026                context.dedicated()
2027            } else {
2028                context.shared(true)
2029            };
2030
2031            context.clone().spawn(|_| async move {
2032                panic!("blocking task panicked");
2033            });
2034
2035            // Loop until panic
2036            loop {
2037                context.sleep(Duration::from_millis(100)).await;
2038            }
2039        });
2040    }
2041
2042    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2043    where
2044        R::Context: Spawner + Clock,
2045    {
2046        let result: Result<(), Error> = runner.start(|context| async move {
2047            let context = if dedicated {
2048                context.dedicated()
2049            } else {
2050                context.shared(true)
2051            };
2052
2053            let handle = context.clone().spawn(|_| async move {
2054                panic!("blocking task panicked");
2055            });
2056            handle.await
2057        });
2058        assert!(matches!(result, Err(Error::Exited)));
2059    }
2060
2061    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2062        runner.start(|_| async move {
2063            // Setup tracked resource
2064            let dropper = Arc::new(());
2065            let executor = deterministic::Runner::default();
2066            executor.start({
2067                let dropper = dropper.clone();
2068                move |context| async move {
2069                    // Create tasks with circular dependencies through channels
2070                    let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2071                    let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2072                    let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2073
2074                    // Task 1 holds tx2 and waits on rx1
2075                    context.with_label("task1").spawn({
2076                        let setup_tx = setup_tx.clone();
2077                        let dropper = dropper.clone();
2078                        move |_| async move {
2079                            // Setup deadlock and mark ready
2080                            tx2.send(()).unwrap();
2081                            rx1.recv().await.unwrap();
2082                            setup_tx.send(()).unwrap();
2083
2084                            // Wait forever
2085                            while rx1.recv().await.is_some() {}
2086                            drop(tx2);
2087                            drop(dropper);
2088                        }
2089                    });
2090
2091                    // Task 2 holds tx1 and waits on rx2
2092                    context.with_label("task2").spawn(move |_| async move {
2093                        // Setup deadlock and mark ready
2094                        tx1.send(()).unwrap();
2095                        rx2.recv().await.unwrap();
2096                        setup_tx.send(()).unwrap();
2097
2098                        // Wait forever
2099                        while rx2.recv().await.is_some() {}
2100                        drop(tx1);
2101                        drop(dropper);
2102                    });
2103
2104                    // Wait for tasks to start
2105                    setup_rx.recv().await.unwrap();
2106                    setup_rx.recv().await.unwrap();
2107                }
2108            });
2109
2110            // After runtime drop, both tasks should be cleaned up
2111            Arc::try_unwrap(dropper).expect("references remaining");
2112        });
2113    }
2114
2115    fn test_late_waker<R: Runner>(runner: R)
2116    where
2117        R::Context: Metrics + Spawner,
2118    {
2119        // A future that captures its waker and sends it to the caller, then
2120        // stays pending forever.
2121        struct CaptureWaker {
2122            tx: Option<oneshot::Sender<Waker>>,
2123            sent: bool,
2124        }
2125        impl Future for CaptureWaker {
2126            type Output = ();
2127            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2128                if !self.sent {
2129                    if let Some(tx) = self.tx.take() {
2130                        // Send a clone of the current task's waker to the root
2131                        let _ = tx.send(cx.waker().clone());
2132                    }
2133                    self.sent = true;
2134                }
2135                Poll::Pending
2136            }
2137        }
2138
2139        // A guard that wakes the captured waker on drop.
2140        struct WakeOnDrop(Option<Waker>);
2141        impl Drop for WakeOnDrop {
2142            fn drop(&mut self) {
2143                if let Some(w) = self.0.take() {
2144                    w.wake_by_ref();
2145                }
2146            }
2147        }
2148
2149        // Run the executor to completion
2150        let holder = runner.start(|context| async move {
2151            // Wire a oneshot to receive the task waker.
2152            let (tx, rx) = oneshot::channel::<Waker>();
2153
2154            // Spawn a task that registers its waker and then stays pending.
2155            context
2156                .with_label("capture_waker")
2157                .spawn(move |_| async move {
2158                    CaptureWaker {
2159                        tx: Some(tx),
2160                        sent: false,
2161                    }
2162                    .await;
2163                });
2164
2165            // Ensure the spawned task runs and registers its waker.
2166            utils::reschedule().await;
2167
2168            // Receive the waker from the spawned task.
2169            let waker = rx.await.expect("waker not received");
2170
2171            // Return a guard that will wake after the runtime has dropped.
2172            WakeOnDrop(Some(waker))
2173        });
2174
2175        // Dropping the guard after the runtime has torn down will trigger a wake on
2176        // a task whose executor has been dropped.
2177        drop(holder);
2178    }
2179
2180    fn test_metrics<R: Runner>(runner: R)
2181    where
2182        R::Context: Metrics,
2183    {
2184        runner.start(|context| async move {
2185            // Assert label
2186            assert_eq!(context.label(), "");
2187
2188            // Register a metric
2189            let counter = Counter::<u64>::default();
2190            context.register("test", "test", counter.clone());
2191
2192            // Increment the counter
2193            counter.inc();
2194
2195            // Encode metrics
2196            let buffer = context.encode();
2197            assert!(buffer.contains("test_total 1"));
2198
2199            // Nested context
2200            let context = context.with_label("nested");
2201            let nested_counter = Counter::<u64>::default();
2202            context.register("test", "test", nested_counter.clone());
2203
2204            // Increment the counter
2205            nested_counter.inc();
2206
2207            // Encode metrics
2208            let buffer = context.encode();
2209            assert!(buffer.contains("nested_test_total 1"));
2210            assert!(buffer.contains("test_total 1"));
2211        });
2212    }
2213
2214    fn test_metrics_with_attribute<R: Runner>(runner: R)
2215    where
2216        R::Context: Metrics,
2217    {
2218        runner.start(|context| async move {
2219            // Create context with a attribute
2220            let ctx_epoch5 = context
2221                .with_label("consensus")
2222                .with_attribute("epoch", "e5");
2223
2224            // Register a metric with the attribute
2225            let counter = Counter::<u64>::default();
2226            ctx_epoch5.register("votes", "vote count", counter.clone());
2227            counter.inc();
2228
2229            // Encode and verify the attribute appears as a label
2230            let buffer = context.encode();
2231            assert!(
2232                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2233                "Expected metric with epoch attribute, got: {}",
2234                buffer
2235            );
2236
2237            // Create context with different epoch attribute (same metric name)
2238            let ctx_epoch6 = context
2239                .with_label("consensus")
2240                .with_attribute("epoch", "e6");
2241            let counter2 = Counter::<u64>::default();
2242            ctx_epoch6.register("votes", "vote count", counter2.clone());
2243            counter2.inc();
2244            counter2.inc();
2245
2246            // Both should appear in encoded output with canonical format (single HELP/TYPE)
2247            let buffer = context.encode();
2248            assert!(
2249                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2250                "Expected metric with epoch=e5, got: {}",
2251                buffer
2252            );
2253            assert!(
2254                buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2255                "Expected metric with epoch=e6, got: {}",
2256                buffer
2257            );
2258
2259            // Verify canonical format: HELP and TYPE should appear exactly once
2260            assert_eq!(
2261                buffer.matches("# HELP consensus_votes").count(),
2262                1,
2263                "HELP should appear exactly once, got: {}",
2264                buffer
2265            );
2266            assert_eq!(
2267                buffer.matches("# TYPE consensus_votes").count(),
2268                1,
2269                "TYPE should appear exactly once, got: {}",
2270                buffer
2271            );
2272
2273            // Multiple attributes
2274            let ctx_multi = context
2275                .with_label("engine")
2276                .with_attribute("region", "us")
2277                .with_attribute("instance", "i1");
2278            let counter3 = Counter::<u64>::default();
2279            ctx_multi.register("requests", "request count", counter3.clone());
2280            counter3.inc();
2281
2282            let buffer = context.encode();
2283            assert!(
2284                buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2285                "Expected metric with sorted attributes, got: {}",
2286                buffer
2287            );
2288        });
2289    }
2290
2291    #[test]
2292    fn test_deterministic_metrics_with_attribute() {
2293        let executor = deterministic::Runner::default();
2294        test_metrics_with_attribute(executor);
2295    }
2296
2297    #[test]
2298    fn test_tokio_metrics_with_attribute() {
2299        let runner = tokio::Runner::default();
2300        test_metrics_with_attribute(runner);
2301    }
2302
2303    fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2304    where
2305        R::Context: Metrics,
2306    {
2307        runner.start(|context| async move {
2308            // Create context with attribute, then nest a label
2309            let ctx = context
2310                .with_label("orchestrator")
2311                .with_attribute("epoch", "e5")
2312                .with_label("engine");
2313
2314            // Register a metric
2315            let counter = Counter::<u64>::default();
2316            ctx.register("votes", "vote count", counter.clone());
2317            counter.inc();
2318
2319            // Verify the attribute is preserved through the nested label
2320            let buffer = context.encode();
2321            assert!(
2322                buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2323                "Expected metric with preserved epoch attribute, got: {}",
2324                buffer
2325            );
2326
2327            // Multiple levels of nesting with attributes at different levels
2328            let ctx2 = context
2329                .with_label("outer")
2330                .with_attribute("region", "us")
2331                .with_label("middle")
2332                .with_attribute("az", "east")
2333                .with_label("inner");
2334
2335            let counter2 = Counter::<u64>::default();
2336            ctx2.register("requests", "request count", counter2.clone());
2337            counter2.inc();
2338            counter2.inc();
2339
2340            let buffer = context.encode();
2341            assert!(
2342                buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2343                "Expected metric with all attributes preserved and sorted, got: {}",
2344                buffer
2345            );
2346        });
2347    }
2348
2349    #[test]
2350    fn test_deterministic_metrics_attribute_with_nested_label() {
2351        let executor = deterministic::Runner::default();
2352        test_metrics_attribute_with_nested_label(executor);
2353    }
2354
2355    #[test]
2356    fn test_tokio_metrics_attribute_with_nested_label() {
2357        let runner = tokio::Runner::default();
2358        test_metrics_attribute_with_nested_label(runner);
2359    }
2360
2361    fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2362    where
2363        R::Context: Metrics,
2364    {
2365        runner.start(|context| async move {
2366            // Create two separate sub-contexts, each with their own attribute
2367            let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2368            let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2369
2370            // Register metrics in ctx_a
2371            let c1 = Counter::<u64>::default();
2372            ctx_a.register("requests", "help", c1);
2373
2374            // Register metrics in ctx_b
2375            let c2 = Counter::<u64>::default();
2376            ctx_b.register("requests", "help", c2);
2377
2378            // Register another metric in ctx_a AFTER ctx_b was used
2379            let c3 = Counter::<u64>::default();
2380            ctx_a.register("errors", "help", c3);
2381
2382            let output = context.encode();
2383
2384            // ctx_a metrics should only have epoch=1
2385            assert!(
2386                output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2387                "ctx_a requests should have epoch=1: {output}"
2388            );
2389            assert!(
2390                output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2391                "ctx_a errors should have epoch=1: {output}"
2392            );
2393            assert!(
2394                !output.contains("component_a_requests_total{epoch=\"2\"}"),
2395                "ctx_a requests should not have epoch=2: {output}"
2396            );
2397
2398            // ctx_b metrics should only have epoch=2
2399            assert!(
2400                output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2401                "ctx_b should have epoch=2: {output}"
2402            );
2403            assert!(
2404                !output.contains("component_b_requests_total{epoch=\"1\"}"),
2405                "ctx_b should not have epoch=1: {output}"
2406            );
2407        });
2408    }
2409
2410    #[test]
2411    fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2412        let executor = deterministic::Runner::default();
2413        test_metrics_attributes_isolated_between_contexts(executor);
2414    }
2415
2416    #[test]
2417    fn test_tokio_metrics_attributes_isolated_between_contexts() {
2418        let runner = tokio::Runner::default();
2419        test_metrics_attributes_isolated_between_contexts(runner);
2420    }
2421
2422    fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2423    where
2424        R::Context: Metrics,
2425    {
2426        runner.start(|context| async move {
2427            // Create two contexts with same attributes but different order
2428            let ctx_ab = context
2429                .with_label("service")
2430                .with_attribute("region", "us")
2431                .with_attribute("env", "prod");
2432
2433            let ctx_ba = context
2434                .with_label("service")
2435                .with_attribute("env", "prod")
2436                .with_attribute("region", "us");
2437
2438            // Register via first context
2439            let c1 = Counter::<u64>::default();
2440            ctx_ab.register("requests", "help", c1.clone());
2441            c1.inc();
2442
2443            // Register via second context - same attributes, different metric
2444            let c2 = Counter::<u64>::default();
2445            ctx_ba.register("errors", "help", c2.clone());
2446            c2.inc();
2447            c2.inc();
2448
2449            let output = context.encode();
2450
2451            // Both should have the same label order (alphabetically sorted: env, region)
2452            assert!(
2453                output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2454                "requests should have sorted labels: {output}"
2455            );
2456            assert!(
2457                output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2458                "errors should have sorted labels: {output}"
2459            );
2460
2461            // Should NOT have reverse order
2462            assert!(
2463                !output.contains("region=\"us\",env=\"prod\""),
2464                "should not have unsorted label order: {output}"
2465            );
2466        });
2467    }
2468
2469    #[test]
2470    fn test_deterministic_metrics_attributes_sorted_deterministically() {
2471        let executor = deterministic::Runner::default();
2472        test_metrics_attributes_sorted_deterministically(executor);
2473    }
2474
2475    #[test]
2476    fn test_tokio_metrics_attributes_sorted_deterministically() {
2477        let runner = tokio::Runner::default();
2478        test_metrics_attributes_sorted_deterministically(runner);
2479    }
2480
2481    fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2482    where
2483        R::Context: Metrics,
2484    {
2485        runner.start(|context| async move {
2486            // Service A: plain, no nested labels
2487            let svc_a = context.with_label("service_a");
2488
2489            // Service A with attribute (same top-level label, different context)
2490            let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2491
2492            // Service B with nested label: service_b_worker
2493            let svc_b_worker = context.with_label("service_b").with_label("worker");
2494
2495            // Service B with nested label AND attribute
2496            let svc_b_worker_shard = context
2497                .with_label("service_b")
2498                .with_label("worker")
2499                .with_attribute("shard", 99);
2500
2501            // Service B different nested label: service_b_manager
2502            let svc_b_manager = context.with_label("service_b").with_label("manager");
2503
2504            // Service C: plain, proves no cross-service contamination
2505            let svc_c = context.with_label("service_c");
2506
2507            // Register metrics in all contexts
2508            let c1 = Counter::<u64>::default();
2509            svc_a.register("requests", "help", c1);
2510
2511            let c2 = Counter::<u64>::default();
2512            svc_a_v2.register("requests", "help", c2);
2513
2514            let c3 = Counter::<u64>::default();
2515            svc_b_worker.register("tasks", "help", c3);
2516
2517            let c4 = Counter::<u64>::default();
2518            svc_b_worker_shard.register("tasks", "help", c4);
2519
2520            let c5 = Counter::<u64>::default();
2521            svc_b_manager.register("decisions", "help", c5);
2522
2523            let c6 = Counter::<u64>::default();
2524            svc_c.register("requests", "help", c6);
2525
2526            let output = context.encode();
2527
2528            // Service A plain and attributed both exist independently
2529            assert!(
2530                output.contains("service_a_requests_total 0"),
2531                "svc_a plain should exist: {output}"
2532            );
2533            assert!(
2534                output.contains("service_a_requests_total{version=\"2\"} 0"),
2535                "svc_a_v2 should have version=2: {output}"
2536            );
2537
2538            // Service B worker: plain and attributed versions
2539            assert!(
2540                output.contains("service_b_worker_tasks_total 0"),
2541                "svc_b_worker plain should exist: {output}"
2542            );
2543            assert!(
2544                output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2545                "svc_b_worker_shard should have shard=99: {output}"
2546            );
2547
2548            // Service B manager: no attributes
2549            assert!(
2550                output.contains("service_b_manager_decisions_total 0"),
2551                "svc_b_manager should have no attributes: {output}"
2552            );
2553            assert!(
2554                !output.contains("service_b_manager_decisions_total{"),
2555                "svc_b_manager should have no attributes at all: {output}"
2556            );
2557
2558            // Service C: no attributes, no contamination
2559            assert!(
2560                output.contains("service_c_requests_total 0"),
2561                "svc_c should have no attributes: {output}"
2562            );
2563            assert!(
2564                !output.contains("service_c_requests_total{"),
2565                "svc_c should have no attributes at all: {output}"
2566            );
2567
2568            // Cross-contamination checks
2569            assert!(
2570                !output.contains("service_b_manager_decisions_total{shard="),
2571                "svc_b_manager should not have shard: {output}"
2572            );
2573            assert!(
2574                !output.contains("service_a_requests_total{shard="),
2575                "svc_a should not have shard: {output}"
2576            );
2577            assert!(
2578                !output.contains("service_c_requests_total{version="),
2579                "svc_c should not have version: {output}"
2580            );
2581        });
2582    }
2583
2584    #[test]
2585    fn test_deterministic_metrics_nested_labels_with_attributes() {
2586        let executor = deterministic::Runner::default();
2587        test_metrics_nested_labels_with_attributes(executor);
2588    }
2589
2590    #[test]
2591    fn test_tokio_metrics_nested_labels_with_attributes() {
2592        let runner = tokio::Runner::default();
2593        test_metrics_nested_labels_with_attributes(runner);
2594    }
2595
2596    fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2597    where
2598        R::Context: Metrics,
2599    {
2600        runner.start(|context| async move {
2601            #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2602            struct RequestLabels {
2603                method: String,
2604                status: u16,
2605            }
2606
2607            // Create context with attribute
2608            let ctx = context
2609                .with_label("api")
2610                .with_attribute("region", "us_east")
2611                .with_attribute("env", "prod");
2612
2613            // Register a Family metric
2614            let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2615            ctx.register("requests", "HTTP requests", requests.clone());
2616
2617            // Increment counters for different label combinations
2618            requests
2619                .get_or_create(&RequestLabels {
2620                    method: "GET".to_string(),
2621                    status: 200,
2622                })
2623                .inc();
2624            requests
2625                .get_or_create(&RequestLabels {
2626                    method: "POST".to_string(),
2627                    status: 201,
2628                })
2629                .inc();
2630            requests
2631                .get_or_create(&RequestLabels {
2632                    method: "GET".to_string(),
2633                    status: 404,
2634                })
2635                .inc();
2636
2637            let output = context.encode();
2638
2639            // Context attributes appear first (alphabetically sorted), then Family labels
2640            // Context attributes: env="prod", region="us_east"
2641            // Family labels: method, status
2642            assert!(
2643                output.contains(
2644                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2645                ),
2646                "GET 200 should have merged labels: {output}"
2647            );
2648            assert!(
2649                output.contains(
2650                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2651                ),
2652                "POST 201 should have merged labels: {output}"
2653            );
2654            assert!(
2655                output.contains(
2656                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2657                ),
2658                "GET 404 should have merged labels: {output}"
2659            );
2660
2661            // Create another context WITHOUT attributes to verify isolation
2662            let ctx_plain = context.with_label("api_plain");
2663            let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2664            ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2665
2666            plain_requests
2667                .get_or_create(&RequestLabels {
2668                    method: "DELETE".to_string(),
2669                    status: 204,
2670                })
2671                .inc();
2672
2673            let output = context.encode();
2674
2675            // Plain context should have Family labels but no context attributes
2676            assert!(
2677                output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2678                "plain DELETE should have only family labels: {output}"
2679            );
2680            assert!(
2681                !output.contains("api_plain_requests_total{env="),
2682                "plain should not have env attribute: {output}"
2683            );
2684            assert!(
2685                !output.contains("api_plain_requests_total{region="),
2686                "plain should not have region attribute: {output}"
2687            );
2688        });
2689    }
2690
2691    #[test]
2692    fn test_deterministic_metrics_family_with_attributes() {
2693        let executor = deterministic::Runner::default();
2694        test_metrics_family_with_attributes(executor);
2695    }
2696
2697    #[test]
2698    fn test_tokio_metrics_family_with_attributes() {
2699        let runner = tokio::Runner::default();
2700        test_metrics_family_with_attributes(runner);
2701    }
2702
2703    fn test_with_scope_register_and_encode<R: Runner>(runner: R)
2704    where
2705        R::Context: Metrics,
2706    {
2707        runner.start(|context| async move {
2708            let scoped = context.with_label("engine").with_scope();
2709            let counter = Counter::<u64>::default();
2710            scoped.register("votes", "vote count", counter.clone());
2711            counter.inc();
2712
2713            let buffer = context.encode();
2714            assert!(
2715                buffer.contains("engine_votes_total 1"),
2716                "scoped metric should appear in encode: {buffer}"
2717            );
2718        });
2719    }
2720
2721    #[test]
2722    fn test_deterministic_with_scope_register_and_encode() {
2723        let executor = deterministic::Runner::default();
2724        test_with_scope_register_and_encode(executor);
2725    }
2726
2727    #[test]
2728    fn test_tokio_with_scope_register_and_encode() {
2729        let runner = tokio::Runner::default();
2730        test_with_scope_register_and_encode(runner);
2731    }
2732
2733    fn test_with_scope_drop_removes_metrics<R: Runner>(runner: R)
2734    where
2735        R::Context: Metrics,
2736    {
2737        runner.start(|context| async move {
2738            // Register a permanent metric
2739            let permanent = Counter::<u64>::default();
2740            context.with_label("permanent").register(
2741                "counter",
2742                "permanent counter",
2743                permanent.clone(),
2744            );
2745            permanent.inc();
2746
2747            // Register a scoped metric
2748            let scoped = context.with_label("engine").with_scope();
2749            let counter = Counter::<u64>::default();
2750            scoped.register("votes", "vote count", counter.clone());
2751            counter.inc();
2752
2753            // Both should appear
2754            let buffer = context.encode();
2755            assert!(buffer.contains("permanent_counter_total 1"));
2756            assert!(buffer.contains("engine_votes_total 1"));
2757
2758            // Drop the scoped context
2759            drop(scoped);
2760
2761            // Only permanent metric should remain
2762            let buffer = context.encode();
2763            assert!(
2764                buffer.contains("permanent_counter_total 1"),
2765                "permanent metric should survive scope drop: {buffer}"
2766            );
2767            assert!(
2768                !buffer.contains("engine_votes"),
2769                "scoped metric should be removed after scope drop: {buffer}"
2770            );
2771        });
2772    }
2773
2774    #[test]
2775    fn test_deterministic_with_scope_drop_removes_metrics() {
2776        let executor = deterministic::Runner::default();
2777        test_with_scope_drop_removes_metrics(executor);
2778    }
2779
2780    #[test]
2781    fn test_tokio_with_scope_drop_removes_metrics() {
2782        let runner = tokio::Runner::default();
2783        test_with_scope_drop_removes_metrics(runner);
2784    }
2785
2786    fn test_with_scope_attributes<R: Runner>(runner: R)
2787    where
2788        R::Context: Metrics,
2789    {
2790        runner.start(|context| async move {
2791            // Simulate epoch lifecycle
2792            let epoch1 = context
2793                .with_label("engine")
2794                .with_attribute("epoch", 1)
2795                .with_scope();
2796            let c1 = Counter::<u64>::default();
2797            epoch1.register("votes", "vote count", c1.clone());
2798            c1.inc();
2799
2800            let epoch2 = context
2801                .with_label("engine")
2802                .with_attribute("epoch", 2)
2803                .with_scope();
2804            let c2 = Counter::<u64>::default();
2805            epoch2.register("votes", "vote count", c2.clone());
2806            c2.inc();
2807            c2.inc();
2808
2809            // Both epochs visible
2810            let buffer = context.encode();
2811            assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
2812            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2813
2814            // HELP/TYPE lines should be deduplicated across scopes
2815            assert_eq!(
2816                buffer.matches("# HELP engine_votes").count(),
2817                1,
2818                "HELP should appear once: {buffer}"
2819            );
2820            assert_eq!(
2821                buffer.matches("# TYPE engine_votes").count(),
2822                1,
2823                "TYPE should appear once: {buffer}"
2824            );
2825
2826            // Drop epoch 1 context
2827            drop(epoch1);
2828            let buffer = context.encode();
2829            assert!(
2830                !buffer.contains("epoch=\"1\""),
2831                "epoch 1 should be gone: {buffer}"
2832            );
2833            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2834
2835            // Drop epoch 2 context
2836            drop(epoch2);
2837            let buffer = context.encode();
2838            assert!(
2839                !buffer.contains("engine_votes"),
2840                "all epoch metrics should be gone: {buffer}"
2841            );
2842        });
2843    }
2844
2845    #[test]
2846    fn test_deterministic_with_scope_attributes() {
2847        let executor = deterministic::Runner::default();
2848        test_with_scope_attributes(executor);
2849    }
2850
2851    #[test]
2852    fn test_tokio_with_scope_attributes() {
2853        let runner = tokio::Runner::default();
2854        test_with_scope_attributes(runner);
2855    }
2856
2857    fn test_with_scope_inherits_on_with_label<R: Runner>(runner: R)
2858    where
2859        R::Context: Metrics,
2860    {
2861        runner.start(|context| async move {
2862            let scoped = context.with_label("engine").with_scope();
2863
2864            // Child context inherits scope
2865            let child = scoped.with_label("batcher");
2866            let counter = Counter::<u64>::default();
2867            child.register("msgs", "message count", counter.clone());
2868            counter.inc();
2869
2870            let buffer = context.encode();
2871            assert!(buffer.contains("engine_batcher_msgs_total 1"));
2872
2873            // Dropping all scoped contexts removes all metrics in the scope
2874            drop(child);
2875            drop(scoped);
2876            let buffer = context.encode();
2877            assert!(
2878                !buffer.contains("engine_batcher_msgs"),
2879                "child metric should be removed with scope: {buffer}"
2880            );
2881        });
2882    }
2883
2884    #[test]
2885    fn test_deterministic_with_scope_inherits_on_with_label() {
2886        let executor = deterministic::Runner::default();
2887        test_with_scope_inherits_on_with_label(executor);
2888    }
2889
2890    #[test]
2891    fn test_tokio_with_scope_inherits_on_with_label() {
2892        let runner = tokio::Runner::default();
2893        test_with_scope_inherits_on_with_label(runner);
2894    }
2895
2896    fn test_multiple_scopes<R: Runner>(runner: R)
2897    where
2898        R::Context: Metrics,
2899    {
2900        runner.start(|context| async move {
2901            let ctx_a = context.with_label("a").with_scope();
2902            let ctx_b = context.with_label("b").with_scope();
2903
2904            let ca = Counter::<u64>::default();
2905            ctx_a.register("counter", "a counter", ca.clone());
2906            ca.inc();
2907
2908            let cb = Counter::<u64>::default();
2909            ctx_b.register("counter", "b counter", cb.clone());
2910            cb.inc();
2911            cb.inc();
2912
2913            let buffer = context.encode();
2914            assert!(buffer.contains("a_counter_total 1"));
2915            assert!(buffer.contains("b_counter_total 2"));
2916
2917            // Drop only scope a
2918            drop(ctx_a);
2919            let buffer = context.encode();
2920            assert!(!buffer.contains("a_counter"));
2921            assert!(buffer.contains("b_counter_total 2"));
2922
2923            // Drop scope b
2924            drop(ctx_b);
2925            let buffer = context.encode();
2926            assert!(!buffer.contains("b_counter"));
2927        });
2928    }
2929
2930    #[test]
2931    fn test_deterministic_multiple_scopes() {
2932        let executor = deterministic::Runner::default();
2933        test_multiple_scopes(executor);
2934    }
2935
2936    #[test]
2937    fn test_tokio_multiple_scopes() {
2938        let runner = tokio::Runner::default();
2939        test_multiple_scopes(runner);
2940    }
2941
2942    fn test_encode_single_eof<R: Runner>(runner: R)
2943    where
2944        R::Context: Metrics,
2945    {
2946        runner.start(|context| async move {
2947            let root_counter = Counter::<u64>::default();
2948            context.register("root", "root metric", root_counter.clone());
2949            root_counter.inc();
2950
2951            let scoped = context.with_label("engine").with_scope();
2952            let scoped_counter = Counter::<u64>::default();
2953            scoped.register("ops", "scoped metric", scoped_counter.clone());
2954            scoped_counter.inc();
2955
2956            let buffer = context.encode();
2957            assert!(
2958                buffer.contains("root_total 1"),
2959                "root metric missing: {buffer}"
2960            );
2961            assert!(
2962                buffer.contains("engine_ops_total 1"),
2963                "scoped metric missing: {buffer}"
2964            );
2965            assert_eq!(
2966                buffer.matches("# EOF").count(),
2967                1,
2968                "expected exactly one EOF marker: {buffer}"
2969            );
2970            assert!(
2971                buffer.ends_with("# EOF\n"),
2972                "EOF must be the last line: {buffer}"
2973            );
2974        });
2975    }
2976
2977    #[test]
2978    fn test_deterministic_encode_single_eof() {
2979        let executor = deterministic::Runner::default();
2980        test_encode_single_eof(executor);
2981    }
2982
2983    #[test]
2984    fn test_tokio_encode_single_eof() {
2985        let runner = tokio::Runner::default();
2986        test_encode_single_eof(runner);
2987    }
2988
2989    fn test_with_scope_nested_inherits<R: Runner>(runner: R)
2990    where
2991        R::Context: Metrics,
2992    {
2993        runner.start(|context| async move {
2994            let scoped = context.with_label("engine").with_scope();
2995
2996            // Calling with_scope() on an already-scoped context inherits the scope
2997            let nested = scoped.with_scope();
2998            let counter = Counter::<u64>::default();
2999            nested.register("votes", "vote count", counter.clone());
3000            counter.inc();
3001
3002            let buffer = context.encode();
3003            assert!(
3004                buffer.contains("engine_votes_total 1"),
3005                "nested scope should inherit parent scope: {buffer}"
3006            );
3007
3008            // Dropping the nested context alone should NOT clean up metrics
3009            // because the parent scoped context still holds the Arc
3010            drop(nested);
3011            let buffer = context.encode();
3012            assert!(
3013                buffer.contains("engine_votes_total 1"),
3014                "metrics should survive as long as any scope clone exists: {buffer}"
3015            );
3016
3017            // Dropping the parent scoped context cleans up
3018            drop(scoped);
3019            let buffer = context.encode();
3020            assert!(
3021                !buffer.contains("engine_votes"),
3022                "metrics should be removed when all scope clones are dropped: {buffer}"
3023            );
3024        });
3025    }
3026
3027    #[test]
3028    fn test_deterministic_with_scope_nested_inherits() {
3029        let executor = deterministic::Runner::default();
3030        test_with_scope_nested_inherits(executor);
3031    }
3032
3033    #[test]
3034    fn test_tokio_with_scope_nested_inherits() {
3035        let runner = tokio::Runner::default();
3036        test_with_scope_nested_inherits(runner);
3037    }
3038
3039    #[test]
3040    #[should_panic(expected = "duplicate metric:")]
3041    fn test_deterministic_reregister_after_scope_drop() {
3042        let executor = deterministic::Runner::default();
3043        executor.start(|context| async move {
3044            let scoped = context
3045                .with_label("engine")
3046                .with_attribute("epoch", 1)
3047                .with_scope();
3048            let c1 = Counter::<u64>::default();
3049            scoped.register("votes", "vote count", c1);
3050            drop(scoped);
3051
3052            // Re-registering the same key after scope drop is not allowed
3053            let scoped2 = context
3054                .with_label("engine")
3055                .with_attribute("epoch", 1)
3056                .with_scope();
3057            let c2 = Counter::<u64>::default();
3058            scoped2.register("votes", "vote count", c2);
3059        });
3060    }
3061
3062    fn test_with_scope_family_with_attributes<R: Runner>(runner: R)
3063    where
3064        R::Context: Metrics,
3065    {
3066        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3067        struct Peer {
3068            name: String,
3069        }
3070        impl EncodeLabelSet for Peer {
3071            fn encode(
3072                &self,
3073                encoder: &mut prometheus_client::encoding::LabelSetEncoder<'_>,
3074            ) -> Result<(), std::fmt::Error> {
3075                let mut label = encoder.encode_label();
3076                let mut key = label.encode_label_key()?;
3077                EncodeLabelKey::encode(&"peer", &mut key)?;
3078                let mut value = key.encode_label_value()?;
3079                EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3080                value.finish()
3081            }
3082        }
3083
3084        runner.start(|context| async move {
3085            let scoped = context
3086                .with_label("batcher")
3087                .with_attribute("epoch", 1)
3088                .with_scope();
3089
3090            let family: Family<Peer, Counter> = Family::default();
3091            scoped.register("votes", "votes per peer", family.clone());
3092            family
3093                .get_or_create(&Peer {
3094                    name: "alice".into(),
3095                })
3096                .inc();
3097            family.get_or_create(&Peer { name: "bob".into() }).inc();
3098
3099            let buffer = context.encode();
3100            assert!(
3101                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3102                "family with attributes should combine labels: {buffer}"
3103            );
3104            assert!(
3105                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3106                "family with attributes should combine labels: {buffer}"
3107            );
3108
3109            drop(scoped);
3110            let buffer = context.encode();
3111            assert!(
3112                !buffer.contains("batcher_votes"),
3113                "family metrics should be removed: {buffer}"
3114            );
3115        });
3116    }
3117
3118    #[test]
3119    fn test_deterministic_with_scope_family_with_attributes() {
3120        let executor = deterministic::Runner::default();
3121        test_with_scope_family_with_attributes(executor);
3122    }
3123
3124    #[test]
3125    fn test_tokio_with_scope_family_with_attributes() {
3126        let runner = tokio::Runner::default();
3127        test_with_scope_family_with_attributes(runner);
3128    }
3129
3130    #[test]
3131    fn test_deterministic_future() {
3132        let runner = deterministic::Runner::default();
3133        test_error_future(runner);
3134    }
3135
3136    #[test]
3137    fn test_deterministic_clock_sleep() {
3138        let executor = deterministic::Runner::default();
3139        test_clock_sleep(executor);
3140    }
3141
3142    #[test]
3143    fn test_deterministic_clock_sleep_until() {
3144        let executor = deterministic::Runner::default();
3145        test_clock_sleep_until(executor);
3146    }
3147
3148    #[test]
3149    fn test_deterministic_clock_timeout() {
3150        let executor = deterministic::Runner::default();
3151        test_clock_timeout(executor);
3152    }
3153
3154    #[test]
3155    fn test_deterministic_root_finishes() {
3156        let executor = deterministic::Runner::default();
3157        test_root_finishes(executor);
3158    }
3159
3160    #[test]
3161    fn test_deterministic_spawn_after_abort() {
3162        let executor = deterministic::Runner::default();
3163        test_spawn_after_abort(executor);
3164    }
3165
3166    #[test]
3167    fn test_deterministic_spawn_abort() {
3168        let executor = deterministic::Runner::default();
3169        test_spawn_abort(executor, false, false);
3170    }
3171
3172    #[test]
3173    #[should_panic(expected = "blah")]
3174    fn test_deterministic_panic_aborts_root() {
3175        let runner = deterministic::Runner::default();
3176        test_panic_aborts_root(runner);
3177    }
3178
3179    #[test]
3180    #[should_panic(expected = "blah")]
3181    fn test_deterministic_panic_aborts_root_caught() {
3182        let cfg = deterministic::Config::default().with_catch_panics(true);
3183        let runner = deterministic::Runner::new(cfg);
3184        test_panic_aborts_root(runner);
3185    }
3186
3187    #[test]
3188    #[should_panic(expected = "blah")]
3189    fn test_deterministic_panic_aborts_spawn() {
3190        let executor = deterministic::Runner::default();
3191        test_panic_aborts_spawn(executor);
3192    }
3193
3194    #[test]
3195    fn test_deterministic_panic_aborts_spawn_caught() {
3196        let cfg = deterministic::Config::default().with_catch_panics(true);
3197        let executor = deterministic::Runner::new(cfg);
3198        test_panic_aborts_spawn_caught(executor);
3199    }
3200
3201    #[test]
3202    #[should_panic(expected = "boom")]
3203    fn test_deterministic_multiple_panics() {
3204        let executor = deterministic::Runner::default();
3205        test_multiple_panics(executor);
3206    }
3207
3208    #[test]
3209    fn test_deterministic_multiple_panics_caught() {
3210        let cfg = deterministic::Config::default().with_catch_panics(true);
3211        let executor = deterministic::Runner::new(cfg);
3212        test_multiple_panics_caught(executor);
3213    }
3214
3215    #[test]
3216    fn test_deterministic_select() {
3217        let executor = deterministic::Runner::default();
3218        test_select(executor);
3219    }
3220
3221    #[test]
3222    fn test_deterministic_select_loop() {
3223        let executor = deterministic::Runner::default();
3224        test_select_loop(executor);
3225    }
3226
3227    #[test]
3228    fn test_deterministic_storage_operations() {
3229        let executor = deterministic::Runner::default();
3230        test_storage_operations(executor);
3231    }
3232
3233    #[test]
3234    fn test_deterministic_blob_read_write() {
3235        let executor = deterministic::Runner::default();
3236        test_blob_read_write(executor);
3237    }
3238
3239    #[test]
3240    fn test_deterministic_blob_resize() {
3241        let executor = deterministic::Runner::default();
3242        test_blob_resize(executor);
3243    }
3244
3245    #[test]
3246    fn test_deterministic_many_partition_read_write() {
3247        let executor = deterministic::Runner::default();
3248        test_many_partition_read_write(executor);
3249    }
3250
3251    #[test]
3252    fn test_deterministic_blob_read_past_length() {
3253        let executor = deterministic::Runner::default();
3254        test_blob_read_past_length(executor);
3255    }
3256
3257    #[test]
3258    fn test_deterministic_blob_clone_and_concurrent_read() {
3259        // Run test
3260        let executor = deterministic::Runner::default();
3261        test_blob_clone_and_concurrent_read(executor);
3262    }
3263
3264    #[test]
3265    fn test_deterministic_shutdown() {
3266        let executor = deterministic::Runner::default();
3267        test_shutdown(executor);
3268    }
3269
3270    #[test]
3271    fn test_deterministic_shutdown_multiple_signals() {
3272        let executor = deterministic::Runner::default();
3273        test_shutdown_multiple_signals(executor);
3274    }
3275
3276    #[test]
3277    fn test_deterministic_shutdown_timeout() {
3278        let executor = deterministic::Runner::default();
3279        test_shutdown_timeout(executor);
3280    }
3281
3282    #[test]
3283    fn test_deterministic_shutdown_multiple_stop_calls() {
3284        let executor = deterministic::Runner::default();
3285        test_shutdown_multiple_stop_calls(executor);
3286    }
3287
3288    #[test]
3289    fn test_deterministic_unfulfilled_shutdown() {
3290        let executor = deterministic::Runner::default();
3291        test_unfulfilled_shutdown(executor);
3292    }
3293
3294    #[test]
3295    fn test_deterministic_spawn_dedicated() {
3296        let executor = deterministic::Runner::default();
3297        test_spawn_dedicated(executor);
3298    }
3299
3300    #[test]
3301    fn test_deterministic_spawn() {
3302        let runner = deterministic::Runner::default();
3303        test_spawn(runner);
3304    }
3305
3306    #[test]
3307    fn test_deterministic_spawn_abort_on_parent_abort() {
3308        let runner = deterministic::Runner::default();
3309        test_spawn_abort_on_parent_abort(runner);
3310    }
3311
3312    #[test]
3313    fn test_deterministic_spawn_abort_on_parent_completion() {
3314        let runner = deterministic::Runner::default();
3315        test_spawn_abort_on_parent_completion(runner);
3316    }
3317
3318    #[test]
3319    fn test_deterministic_spawn_cascading_abort() {
3320        let runner = deterministic::Runner::default();
3321        test_spawn_cascading_abort(runner);
3322    }
3323
3324    #[test]
3325    fn test_deterministic_child_survives_sibling_completion() {
3326        let runner = deterministic::Runner::default();
3327        test_child_survives_sibling_completion(runner);
3328    }
3329
3330    #[test]
3331    fn test_deterministic_spawn_clone_chain() {
3332        let runner = deterministic::Runner::default();
3333        test_spawn_clone_chain(runner);
3334    }
3335
3336    #[test]
3337    fn test_deterministic_spawn_sparse_clone_chain() {
3338        let runner = deterministic::Runner::default();
3339        test_spawn_sparse_clone_chain(runner);
3340    }
3341
3342    #[test]
3343    fn test_deterministic_spawn_blocking() {
3344        for dedicated in [false, true] {
3345            let executor = deterministic::Runner::default();
3346            test_spawn_blocking(executor, dedicated);
3347        }
3348    }
3349
3350    #[test]
3351    #[should_panic(expected = "blocking task panicked")]
3352    fn test_deterministic_spawn_blocking_panic() {
3353        for dedicated in [false, true] {
3354            let executor = deterministic::Runner::default();
3355            test_spawn_blocking_panic(executor, dedicated);
3356        }
3357    }
3358
3359    #[test]
3360    fn test_deterministic_spawn_blocking_panic_caught() {
3361        for dedicated in [false, true] {
3362            let cfg = deterministic::Config::default().with_catch_panics(true);
3363            let executor = deterministic::Runner::new(cfg);
3364            test_spawn_blocking_panic_caught(executor, dedicated);
3365        }
3366    }
3367
3368    #[test]
3369    fn test_deterministic_spawn_blocking_abort() {
3370        for (dedicated, blocking) in [(false, true), (true, false)] {
3371            let executor = deterministic::Runner::default();
3372            test_spawn_abort(executor, dedicated, blocking);
3373        }
3374    }
3375
3376    #[test]
3377    fn test_deterministic_circular_reference_prevents_cleanup() {
3378        let executor = deterministic::Runner::default();
3379        test_circular_reference_prevents_cleanup(executor);
3380    }
3381
3382    #[test]
3383    fn test_deterministic_late_waker() {
3384        let executor = deterministic::Runner::default();
3385        test_late_waker(executor);
3386    }
3387
3388    #[test]
3389    fn test_deterministic_metrics() {
3390        let executor = deterministic::Runner::default();
3391        test_metrics(executor);
3392    }
3393
3394    #[test_collect_traces]
3395    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3396        let executor = deterministic::Runner::new(deterministic::Config::default());
3397        executor.start(|context| async move {
3398            context
3399                .with_label("test")
3400                .instrumented()
3401                .spawn(|context| async move {
3402                    tracing::info!(field = "test field", "test log");
3403
3404                    context
3405                        .with_label("inner")
3406                        .instrumented()
3407                        .spawn(|_| async move {
3408                            tracing::info!("inner log");
3409                        })
3410                        .await
3411                        .unwrap();
3412                })
3413                .await
3414                .unwrap();
3415        });
3416
3417        let info_traces = traces.get_by_level(Level::INFO);
3418        assert_eq!(info_traces.len(), 2);
3419
3420        // Outer log (single span)
3421        info_traces
3422            .expect_event_at_index(0, |event| {
3423                event.metadata.expect_content_exact("test log")?;
3424                event.metadata.expect_field_count(1)?;
3425                event.metadata.expect_field_exact("field", "test field")?;
3426                event.expect_span_count(1)?;
3427                event.expect_span_at_index(0, |span| {
3428                    span.expect_content_exact("task")?;
3429                    span.expect_field_count(1)?;
3430                    span.expect_field_exact("name", "test")
3431                })
3432            })
3433            .unwrap();
3434
3435        info_traces
3436            .expect_event_at_index(1, |event| {
3437                event.metadata.expect_content_exact("inner log")?;
3438                event.metadata.expect_field_count(0)?;
3439                event.expect_span_count(1)?;
3440                event.expect_span_at_index(0, |span| {
3441                    span.expect_content_exact("task")?;
3442                    span.expect_field_count(1)?;
3443                    span.expect_field_exact("name", "test_inner")
3444                })
3445            })
3446            .unwrap();
3447    }
3448
3449    #[test]
3450    fn test_deterministic_resolver() {
3451        let executor = deterministic::Runner::default();
3452        executor.start(|context| async move {
3453            // Register DNS mappings
3454            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3455            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3456            context.resolver_register("example.com", Some(vec![ip1, ip2]));
3457
3458            // Resolve registered hostname
3459            let addrs = context.resolve("example.com").await.unwrap();
3460            assert_eq!(addrs, vec![ip1, ip2]);
3461
3462            // Resolve unregistered hostname
3463            let result = context.resolve("unknown.com").await;
3464            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3465
3466            // Remove mapping
3467            context.resolver_register("example.com", None);
3468            let result = context.resolve("example.com").await;
3469            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3470        });
3471    }
3472
3473    #[test]
3474    fn test_tokio_error_future() {
3475        let runner = tokio::Runner::default();
3476        test_error_future(runner);
3477    }
3478
3479    #[test]
3480    fn test_tokio_clock_sleep() {
3481        let executor = tokio::Runner::default();
3482        test_clock_sleep(executor);
3483    }
3484
3485    #[test]
3486    fn test_tokio_clock_sleep_until() {
3487        let executor = tokio::Runner::default();
3488        test_clock_sleep_until(executor);
3489    }
3490
3491    #[test]
3492    fn test_tokio_clock_timeout() {
3493        let executor = tokio::Runner::default();
3494        test_clock_timeout(executor);
3495    }
3496
3497    #[test]
3498    fn test_tokio_root_finishes() {
3499        let executor = tokio::Runner::default();
3500        test_root_finishes(executor);
3501    }
3502
3503    #[test]
3504    fn test_tokio_spawn_after_abort() {
3505        let executor = tokio::Runner::default();
3506        test_spawn_after_abort(executor);
3507    }
3508
3509    #[test]
3510    fn test_tokio_spawn_abort() {
3511        let executor = tokio::Runner::default();
3512        test_spawn_abort(executor, false, false);
3513    }
3514
3515    #[test]
3516    #[should_panic(expected = "blah")]
3517    fn test_tokio_panic_aborts_root() {
3518        let executor = tokio::Runner::default();
3519        test_panic_aborts_root(executor);
3520    }
3521
3522    #[test]
3523    #[should_panic(expected = "blah")]
3524    fn test_tokio_panic_aborts_root_caught() {
3525        let cfg = tokio::Config::default().with_catch_panics(true);
3526        let executor = tokio::Runner::new(cfg);
3527        test_panic_aborts_root(executor);
3528    }
3529
3530    #[test]
3531    #[should_panic(expected = "blah")]
3532    fn test_tokio_panic_aborts_spawn() {
3533        let executor = tokio::Runner::default();
3534        test_panic_aborts_spawn(executor);
3535    }
3536
3537    #[test]
3538    fn test_tokio_panic_aborts_spawn_caught() {
3539        let cfg = tokio::Config::default().with_catch_panics(true);
3540        let executor = tokio::Runner::new(cfg);
3541        test_panic_aborts_spawn_caught(executor);
3542    }
3543
3544    #[test]
3545    #[should_panic(expected = "boom")]
3546    fn test_tokio_multiple_panics() {
3547        let executor = tokio::Runner::default();
3548        test_multiple_panics(executor);
3549    }
3550
3551    #[test]
3552    fn test_tokio_multiple_panics_caught() {
3553        let cfg = tokio::Config::default().with_catch_panics(true);
3554        let executor = tokio::Runner::new(cfg);
3555        test_multiple_panics_caught(executor);
3556    }
3557
3558    #[test]
3559    fn test_tokio_select() {
3560        let executor = tokio::Runner::default();
3561        test_select(executor);
3562    }
3563
3564    #[test]
3565    fn test_tokio_select_loop() {
3566        let executor = tokio::Runner::default();
3567        test_select_loop(executor);
3568    }
3569
3570    #[test]
3571    fn test_tokio_storage_operations() {
3572        let executor = tokio::Runner::default();
3573        test_storage_operations(executor);
3574    }
3575
3576    #[test]
3577    fn test_tokio_blob_read_write() {
3578        let executor = tokio::Runner::default();
3579        test_blob_read_write(executor);
3580    }
3581
3582    #[test]
3583    fn test_tokio_blob_resize() {
3584        let executor = tokio::Runner::default();
3585        test_blob_resize(executor);
3586    }
3587
3588    #[test]
3589    fn test_tokio_many_partition_read_write() {
3590        let executor = tokio::Runner::default();
3591        test_many_partition_read_write(executor);
3592    }
3593
3594    #[test]
3595    fn test_tokio_blob_read_past_length() {
3596        let executor = tokio::Runner::default();
3597        test_blob_read_past_length(executor);
3598    }
3599
3600    #[test]
3601    fn test_tokio_blob_clone_and_concurrent_read() {
3602        // Run test
3603        let executor = tokio::Runner::default();
3604        test_blob_clone_and_concurrent_read(executor);
3605    }
3606
3607    #[test]
3608    fn test_tokio_shutdown() {
3609        let executor = tokio::Runner::default();
3610        test_shutdown(executor);
3611    }
3612
3613    #[test]
3614    fn test_tokio_shutdown_multiple_signals() {
3615        let executor = tokio::Runner::default();
3616        test_shutdown_multiple_signals(executor);
3617    }
3618
3619    #[test]
3620    fn test_tokio_shutdown_timeout() {
3621        let executor = tokio::Runner::default();
3622        test_shutdown_timeout(executor);
3623    }
3624
3625    #[test]
3626    fn test_tokio_shutdown_multiple_stop_calls() {
3627        let executor = tokio::Runner::default();
3628        test_shutdown_multiple_stop_calls(executor);
3629    }
3630
3631    #[test]
3632    fn test_tokio_unfulfilled_shutdown() {
3633        let executor = tokio::Runner::default();
3634        test_unfulfilled_shutdown(executor);
3635    }
3636
3637    #[test]
3638    fn test_tokio_spawn_dedicated() {
3639        let executor = tokio::Runner::default();
3640        test_spawn_dedicated(executor);
3641    }
3642
3643    #[test]
3644    fn test_tokio_spawn() {
3645        let runner = tokio::Runner::default();
3646        test_spawn(runner);
3647    }
3648
3649    #[test]
3650    fn test_tokio_spawn_abort_on_parent_abort() {
3651        let runner = tokio::Runner::default();
3652        test_spawn_abort_on_parent_abort(runner);
3653    }
3654
3655    #[test]
3656    fn test_tokio_spawn_abort_on_parent_completion() {
3657        let runner = tokio::Runner::default();
3658        test_spawn_abort_on_parent_completion(runner);
3659    }
3660
3661    #[test]
3662    fn test_tokio_spawn_cascading_abort() {
3663        let runner = tokio::Runner::default();
3664        test_spawn_cascading_abort(runner);
3665    }
3666
3667    #[test]
3668    fn test_tokio_child_survives_sibling_completion() {
3669        let runner = tokio::Runner::default();
3670        test_child_survives_sibling_completion(runner);
3671    }
3672
3673    #[test]
3674    fn test_tokio_spawn_clone_chain() {
3675        let runner = tokio::Runner::default();
3676        test_spawn_clone_chain(runner);
3677    }
3678
3679    #[test]
3680    fn test_tokio_spawn_sparse_clone_chain() {
3681        let runner = tokio::Runner::default();
3682        test_spawn_sparse_clone_chain(runner);
3683    }
3684
3685    #[test]
3686    fn test_tokio_spawn_blocking() {
3687        for dedicated in [false, true] {
3688            let executor = tokio::Runner::default();
3689            test_spawn_blocking(executor, dedicated);
3690        }
3691    }
3692
3693    #[test]
3694    #[should_panic(expected = "blocking task panicked")]
3695    fn test_tokio_spawn_blocking_panic() {
3696        for dedicated in [false, true] {
3697            let executor = tokio::Runner::default();
3698            test_spawn_blocking_panic(executor, dedicated);
3699        }
3700    }
3701
3702    #[test]
3703    fn test_tokio_spawn_blocking_panic_caught() {
3704        for dedicated in [false, true] {
3705            let cfg = tokio::Config::default().with_catch_panics(true);
3706            let executor = tokio::Runner::new(cfg);
3707            test_spawn_blocking_panic_caught(executor, dedicated);
3708        }
3709    }
3710
3711    #[test]
3712    fn test_tokio_spawn_blocking_abort() {
3713        for (dedicated, blocking) in [(false, true), (true, false)] {
3714            let executor = tokio::Runner::default();
3715            test_spawn_abort(executor, dedicated, blocking);
3716        }
3717    }
3718
3719    #[test]
3720    fn test_tokio_circular_reference_prevents_cleanup() {
3721        let executor = tokio::Runner::default();
3722        test_circular_reference_prevents_cleanup(executor);
3723    }
3724
3725    #[test]
3726    fn test_tokio_late_waker() {
3727        let executor = tokio::Runner::default();
3728        test_late_waker(executor);
3729    }
3730
3731    #[test]
3732    fn test_tokio_metrics() {
3733        let executor = tokio::Runner::default();
3734        test_metrics(executor);
3735    }
3736
3737    #[test]
3738    fn test_tokio_process_rss_metric() {
3739        let executor = tokio::Runner::default();
3740        executor.start(|context| async move {
3741            loop {
3742                // Wait for RSS metric to be available
3743                let metrics = context.encode();
3744                if !metrics.contains("runtime_process_rss") {
3745                    context.sleep(Duration::from_millis(100)).await;
3746                    continue;
3747                }
3748
3749                // Verify the RSS value is eventually populated (greater than 0)
3750                for line in metrics.lines() {
3751                    if line.starts_with("runtime_process_rss")
3752                        && !line.starts_with("runtime_process_rss{")
3753                    {
3754                        let parts: Vec<&str> = line.split_whitespace().collect();
3755                        if parts.len() >= 2 {
3756                            let rss_value: i64 =
3757                                parts[1].parse().expect("Failed to parse RSS value");
3758                            if rss_value > 0 {
3759                                return;
3760                            }
3761                        }
3762                    }
3763                }
3764            }
3765        });
3766    }
3767
3768    #[test]
3769    fn test_tokio_telemetry() {
3770        let executor = tokio::Runner::default();
3771        executor.start(|context| async move {
3772            // Define the server address
3773            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3774
3775            // Configure telemetry
3776            tokio::telemetry::init(
3777                context.with_label("metrics"),
3778                tokio::telemetry::Logging {
3779                    level: Level::INFO,
3780                    json: false,
3781                },
3782                Some(address),
3783                None,
3784            );
3785
3786            // Register a test metric
3787            let counter: Counter<u64> = Counter::default();
3788            context.register("test_counter", "Test counter", counter.clone());
3789            counter.inc();
3790
3791            // Helper functions to parse HTTP response
3792            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3793                let mut line = Vec::new();
3794                loop {
3795                    let received = stream.recv(1).await?;
3796                    let byte = received.coalesce().as_ref()[0];
3797                    if byte == b'\n' {
3798                        if line.last() == Some(&b'\r') {
3799                            line.pop(); // Remove trailing \r
3800                        }
3801                        break;
3802                    }
3803                    line.push(byte);
3804                }
3805                String::from_utf8(line).map_err(|_| Error::ReadFailed)
3806            }
3807
3808            async fn read_headers<St: Stream>(
3809                stream: &mut St,
3810            ) -> Result<HashMap<String, String>, Error> {
3811                let mut headers = HashMap::new();
3812                loop {
3813                    let line = read_line(stream).await?;
3814                    if line.is_empty() {
3815                        break;
3816                    }
3817                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
3818                    if parts.len() == 2 {
3819                        headers.insert(parts[0].to_string(), parts[1].to_string());
3820                    }
3821                }
3822                Ok(headers)
3823            }
3824
3825            async fn read_body<St: Stream>(
3826                stream: &mut St,
3827                content_length: usize,
3828            ) -> Result<String, Error> {
3829                let received = stream.recv(content_length).await?;
3830                String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3831            }
3832
3833            // Simulate a client connecting to the server
3834            let client_handle = context
3835                .with_label("client")
3836                .spawn(move |context| async move {
3837                    let (mut sink, mut stream) = loop {
3838                        match context.dial(address).await {
3839                            Ok((sink, stream)) => break (sink, stream),
3840                            Err(e) => {
3841                                // The client may be polled before the server is ready, that's alright!
3842                                error!(err =?e, "failed to connect");
3843                                context.sleep(Duration::from_millis(10)).await;
3844                            }
3845                        }
3846                    };
3847
3848                    // Send a GET request to the server
3849                    let request = format!(
3850                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3851                    );
3852                    sink.send(Bytes::from(request)).await.unwrap();
3853
3854                    // Read and verify the HTTP status line
3855                    let status_line = read_line(&mut stream).await.unwrap();
3856                    assert_eq!(status_line, "HTTP/1.1 200 OK");
3857
3858                    // Read and parse headers
3859                    let headers = read_headers(&mut stream).await.unwrap();
3860                    println!("Headers: {headers:?}");
3861                    let content_length = headers
3862                        .get("content-length")
3863                        .unwrap()
3864                        .parse::<usize>()
3865                        .unwrap();
3866
3867                    // Read and verify the body
3868                    let body = read_body(&mut stream, content_length).await.unwrap();
3869                    assert!(body.contains("test_counter_total 1"));
3870                });
3871
3872            // Wait for the client task to complete
3873            client_handle.await.unwrap();
3874        });
3875    }
3876
3877    #[test]
3878    fn test_tokio_resolver() {
3879        let executor = tokio::Runner::default();
3880        executor.start(|context| async move {
3881            let addrs = context.resolve("localhost").await.unwrap();
3882            assert!(!addrs.is_empty());
3883            for addr in addrs {
3884                assert!(
3885                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3886                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3887                );
3888            }
3889        });
3890    }
3891
3892    #[test]
3893    fn test_create_thread_pool_tokio() {
3894        let executor = tokio::Runner::default();
3895        executor.start(|context| async move {
3896            // Create a thread pool with 4 threads
3897            let pool = context
3898                .with_label("pool")
3899                .create_thread_pool(NZUsize!(4))
3900                .unwrap();
3901
3902            // Create a vector of numbers
3903            let v: Vec<_> = (0..10000).collect();
3904
3905            // Use the thread pool to sum the numbers
3906            pool.install(|| {
3907                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3908            });
3909        });
3910    }
3911
3912    #[test]
3913    fn test_create_thread_pool_deterministic() {
3914        let executor = deterministic::Runner::default();
3915        executor.start(|context| async move {
3916            // Create a thread pool with 4 threads
3917            let pool = context
3918                .with_label("pool")
3919                .create_thread_pool(NZUsize!(4))
3920                .unwrap();
3921
3922            // Create a vector of numbers
3923            let v: Vec<_> = (0..10000).collect();
3924
3925            // Use the thread pool to sum the numbers
3926            pool.install(|| {
3927                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3928            });
3929        });
3930    }
3931
3932    fn test_buffer_pooler<R: Runner>(
3933        runner: R,
3934        expected_network_max_per_class: usize,
3935        expected_storage_max_per_class: usize,
3936    ) where
3937        R::Context: BufferPooler,
3938    {
3939        runner.start(|context| async move {
3940            // Verify network pool is accessible and works (cache-line aligned)
3941            let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
3942            assert!(net_buf.capacity() >= 1024);
3943
3944            // Verify storage pool is accessible and works (page-aligned)
3945            let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
3946            assert!(storage_buf.capacity() >= 4096);
3947
3948            // Verify pools have expected configurations
3949            assert_eq!(
3950                context.network_buffer_pool().config().max_per_class.get(),
3951                expected_network_max_per_class
3952            );
3953            assert_eq!(
3954                context.storage_buffer_pool().config().max_per_class.get(),
3955                expected_storage_max_per_class
3956            );
3957        });
3958    }
3959
3960    #[test]
3961    fn test_deterministic_buffer_pooler() {
3962        test_buffer_pooler(deterministic::Runner::default(), 4096, 32);
3963
3964        let runner = deterministic::Runner::new(
3965            deterministic::Config::default()
3966                .with_network_buffer_pool_config(
3967                    BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
3968                )
3969                .with_storage_buffer_pool_config(
3970                    BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
3971                ),
3972        );
3973        test_buffer_pooler(runner, 64, 8);
3974    }
3975
3976    #[test]
3977    fn test_tokio_buffer_pooler() {
3978        test_buffer_pooler(tokio::Runner::default(), 4096, 32);
3979
3980        let runner = tokio::Runner::new(
3981            tokio::Config::default()
3982                .with_network_buffer_pool_config(
3983                    BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
3984                )
3985                .with_storage_buffer_pool_config(
3986                    BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
3987                ),
3988        );
3989        test_buffer_pooler(runner, 64, 8);
3990    }
3991}