Skip to main content

commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
18
19#![doc(
20    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21    html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34    pub mod deterministic;
35    pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38    pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41    mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44    pub mod tokio;
45});
46stability_scope!(BETA {
47    /// Re-export of `Buf` and `BufMut` traits for usage with [I/O buffers](iobuf).
48    pub use bytes::{Buf, BufMut};
49    use commonware_macros::select;
50    use commonware_parallel::{Rayon, ThreadPool};
51    /// Re-export of [governor::Quota] for rate limiting configuration.
52    pub use governor::Quota;
53    use iobuf::PoolError;
54    use rayon::ThreadPoolBuildError;
55    use std::{
56        future::Future,
57        io::Error as IoError,
58        net::SocketAddr,
59        num::NonZeroUsize,
60        time::{Duration, SystemTime},
61    };
62    pub(crate) use telemetry::metrics::{child_label, prefixed_name, METRICS_PREFIX};
63    use thiserror::Error;
64
65    pub mod iobuf;
66    pub use iobuf::{
67        cache_line_size, page_size, BufferPool, BufferPoolConfig, BufferPoolThreadCache,
68        Builder as IoBufsBuilder, IoBuf, IoBufMut, IoBufs, IoBufsMut,
69    };
70
71    pub mod utils;
72    pub use utils::*;
73
74    pub mod telemetry;
75
76    /// Default [`Blob`] version used when no version is specified via [`Storage::open`].
77    pub const DEFAULT_BLOB_VERSION: u16 = 0;
78
79    /// Errors that can occur when interacting with the runtime.
80    #[derive(Error, Debug)]
81    pub enum Error {
82        #[error("exited")]
83        Exited,
84        #[error("closed")]
85        Closed,
86        #[error("timeout")]
87        Timeout,
88        #[error("bind failed")]
89        BindFailed,
90        #[error("connection failed")]
91        ConnectionFailed,
92        #[error("write failed")]
93        WriteFailed,
94        #[error("read failed")]
95        ReadFailed,
96        #[error("send failed")]
97        SendFailed,
98        #[error("recv failed")]
99        RecvFailed,
100        #[error("dns resolution failed: {0}")]
101        ResolveFailed(String),
102        #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
103        PartitionNameInvalid(String),
104        #[error("partition creation failed: {0}")]
105        PartitionCreationFailed(String),
106        #[error("partition missing: {0}")]
107        PartitionMissing(String),
108        #[error("partition corrupt: {0}")]
109        PartitionCorrupt(String),
110        #[error("blob open failed: {0}/{1} error: {2}")]
111        BlobOpenFailed(String, String, IoError),
112        #[error("blob missing: {0}/{1}")]
113        BlobMissing(String, String),
114        #[error("blob resize failed: {0}/{1} error: {2}")]
115        BlobResizeFailed(String, String, IoError),
116        #[error("blob sync failed: {0}/{1} error: {2}")]
117        BlobSyncFailed(String, String, IoError),
118        #[error("blob insufficient length")]
119        BlobInsufficientLength,
120        #[error("blob corrupt: {0}/{1} reason: {2}")]
121        BlobCorrupt(String, String, String),
122        #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
123        BlobVersionMismatch {
124            expected: std::ops::RangeInclusive<u16>,
125            found: u16,
126        },
127        #[error("invalid or missing checksum")]
128        InvalidChecksum,
129        #[error("offset overflow")]
130        OffsetOverflow,
131        #[error("io error: {0}")]
132        Io(#[from] IoError),
133        #[error("buffer pool: {0}")]
134        Pool(#[from] PoolError),
135    }
136
137    /// Interface that any task scheduler must implement to start
138    /// running tasks.
139    pub trait Runner {
140        /// Context defines the environment available to tasks.
141        type Context;
142
143        /// Start running a root task.
144        ///
145        /// When this function returns, all spawned tasks will be canceled. If clean
146        /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
147        /// [Spawner::stopped] to coordinate clean shutdown.
148        fn start<F, Fut>(self, f: F) -> Fut::Output
149        where
150            F: FnOnce(Self::Context) -> Fut,
151            Fut: Future;
152    }
153
154    /// The full identity of a [`Supervisor`] handle.
155    #[derive(Clone, Debug, Default)]
156    pub struct Name {
157        /// Label prefix built by successive [`Supervisor::child`] calls.
158        pub label: String,
159        /// Attributes attached via [`Supervisor::with_attribute`].
160        pub attributes: Vec<(String, String)>,
161    }
162
163    /// Interface to track task hierarchy and identity.
164    pub trait Supervisor: Send + Sync + 'static {
165        /// Return the current label prefix and attributes.
166        fn name(&self) -> Name;
167
168        /// Create a named child context with a new supervision-tree node.
169        ///
170        /// This appends `label` to the current metric prefix and creates a
171        /// child in the supervision tree. Use static role names like
172        /// `"engine"`, `"worker"`, or `"resolver"`. Dynamic values belong in
173        /// [`Supervisor::with_attribute`] so metric names remain bounded.
174        ///
175        /// Labels must start with `[a-zA-Z]` and contain only
176        /// `[a-zA-Z0-9_]`. Runtime-reserved metric prefixes must not be used.
177        #[must_use]
178        fn child(&self, label: &'static str) -> Self;
179
180        /// Add a key-value attribute to this context's identity.
181        ///
182        /// Attributes are attached to metrics registered in this context and
183        /// any child contexts. Unlike [`Supervisor::child`], attributes do not
184        /// affect metric names and do not create a supervision-tree edge. This
185        /// makes them the right place for dynamic values like epochs, rounds,
186        /// shards, or peer identifiers.
187        ///
188        /// Keys must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`.
189        /// Values can be any string. If the key already exists, its value is
190        /// replaced.
191        ///
192        /// ```text
193        /// context
194        ///   |-- child("orchestrator")
195        ///         |-- with_attribute("epoch", "5")
196        ///               |-- counter: votes      -> orchestrator_votes{epoch="5"}
197        ///               |-- counter: proposals  -> orchestrator_proposals{epoch="5"}
198        ///               |-- child("engine")
199        ///                     |-- gauge: height -> orchestrator_engine_height{epoch="5"}
200        /// ```
201        ///
202        /// This pattern avoids wrapping every metric in a `Family` and avoids
203        /// putting dynamic values in metric names like
204        /// `orchestrator_epoch_5_votes`.
205        ///
206        /// Attributes do not reduce cardinality. N epochs still means N time
207        /// series. They just make metrics easier to query, filter, and
208        /// aggregate.
209        ///
210        /// # Family Label Conflicts
211        ///
212        /// When using `Family` metrics, avoid attribute keys that match the
213        /// family's label field names. A conflict produces duplicate labels in
214        /// the encoded output, which is invalid Prometheus format.
215        ///
216        /// ```ignore
217        /// #[derive(EncodeLabelSet)]
218        /// struct Labels { env: String }
219        ///
220        /// // Bad: attribute "env" conflicts with Family field "env".
221        /// let ctx = context.child("api").with_attribute("env", "prod");
222        /// let family: Family<Labels, Counter> = Family::default();
223        /// ctx.register("requests", "help", family);
224        ///
225        /// // Good: use distinct names.
226        /// let ctx = context.child("api").with_attribute("region", "us_east");
227        /// ```
228        ///
229        /// # Querying The Latest Attribute
230        ///
231        /// To query the latest attribute value dynamically, create a gauge to
232        /// track the current value:
233        ///
234        /// ```ignore
235        /// let latest_epoch = context
236        ///     .child("orchestrator")
237        ///     .register("latest_epoch", "current epoch", Gauge::default());
238        /// latest_epoch.set(current_epoch);
239        /// ```
240        ///
241        /// A dashboard can then query `max(orchestrator_latest_epoch)` and use
242        /// the result as a variable in queries such as
243        /// `consensus_engine_votes_total{epoch="$latest_epoch"}`.
244        #[must_use]
245        fn with_attribute(self, key: &'static str, value: impl std::fmt::Display) -> Self;
246    }
247
248    /// Interface that any task scheduler must implement to spawn tasks.
249    pub trait Spawner: Supervisor {
250        /// Return a [`Spawner`] that schedules the next task onto the runtime's shared executor.
251        ///
252        /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
253        /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
254        /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
255        /// [`Spawner::dedicated`] instead.
256        ///
257        /// The shared executor with `blocking == false` is the default spawn mode.
258        #[must_use]
259        fn shared(self, blocking: bool) -> Self;
260
261        /// Return a [`Spawner`] that runs the next task on a dedicated thread when the runtime supports it.
262        ///
263        /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
264        /// shared executor.
265        ///
266        /// This is not the default behavior. See [`Spawner::shared`] for more information.
267        #[must_use]
268        fn dedicated(self) -> Self;
269
270        /// Spawn a task with the current context.
271        ///
272        /// Unlike directly awaiting a future, the task starts running immediately even if the caller
273        /// never awaits the returned [`Handle`].
274        ///
275        /// # Mandatory Supervision
276        ///
277        /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
278        ///
279        /// Spawn consumes the current context and runs the task at a new child node. Additional supervised
280        /// children are created explicitly with [`Supervisor::child`].
281        ///
282        /// ```txt
283        /// ctx_a
284        ///   |
285        ///   +-- child("worker") ---> ctx_c
286        ///   |                  |
287        ///   |                  +-- spawn() ---> Task C (ctx_d)
288        ///   |
289        ///   +-- spawn() ---> Task A (ctx_b)
290        ///                              |
291        ///                              +-- spawn() ---> Task B (ctx_e)
292        ///
293        /// Task A finishes or aborts --> Task B and Task C are aborted
294        /// ```
295        ///
296        /// # Spawn Configuration
297        ///
298        /// [`Spawner::dedicated`] and [`Spawner::shared`] only affect the
299        /// handle they return. [`Supervisor::child`] and [`Spawner::spawn`]
300        /// both start child task contexts from a clean spawn configuration.
301        ///
302        /// Child tasks should assume they start from a clean configuration without needing to inspect how their
303        /// parent was configured.
304        fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
305        where
306            Self: Sized,
307            F: FnOnce(Self) -> Fut + Send + 'static,
308            Fut: Future<Output = T> + Send + 'static,
309            T: Send + 'static;
310
311        /// Signals the runtime to stop execution and waits for all outstanding tasks
312        /// to perform any required cleanup and exit.
313        ///
314        /// This method does not actually kill any tasks but rather signals to them, using
315        /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
316        /// It then waits for all [signal::Signal] references to be dropped before returning.
317        ///
318        /// ## Multiple Stop Calls
319        ///
320        /// This method is idempotent and safe to call multiple times concurrently (on
321        /// different instances of the same context since it consumes `self`). The first
322        /// call initiates shutdown with the provided `value`, and all subsequent calls
323        /// will wait for the same completion regardless of their `value` parameter, i.e.
324        /// the original `value` from the first call is preserved.
325        ///
326        /// ## Timeout
327        ///
328        /// If a timeout is provided, the method will return an error if all [signal::Signal]
329        /// references have not been dropped within the specified duration.
330        fn stop(
331            self,
332            value: i32,
333            timeout: Option<Duration>,
334        ) -> impl Future<Output = Result<(), Error>> + Send;
335
336        /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
337        /// any task.
338        ///
339        /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
340        /// immediately. The [signal::Signal] returned will always resolve to the value of the
341        /// first [Spawner::stop] call.
342        fn stopped(&self) -> signal::Signal;
343    }
344
345    /// Trait for creating [rayon]-compatible thread pools with each worker thread
346    /// placed on dedicated threads via [Spawner].
347    pub trait ThreadPooler: Spawner {
348        /// Creates a clone-able [rayon]-compatible thread pool with [Spawner::spawn].
349        ///
350        /// # Arguments
351        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
352        ///
353        /// # Returns
354        /// A `Result` containing the configured [rayon::ThreadPool] or a [rayon::ThreadPoolBuildError] if the pool cannot
355        /// be built.
356        fn create_thread_pool(
357            &self,
358            concurrency: NonZeroUsize,
359        ) -> Result<ThreadPool, ThreadPoolBuildError>;
360
361        /// Creates a clone-able [Rayon] strategy for use with [commonware_parallel].
362        ///
363        /// # Arguments
364        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
365        ///
366        /// # Returns
367        /// A `Result` containing the configured [Rayon] strategy or a [rayon::ThreadPoolBuildError] if the pool cannot be
368        /// built.
369        fn create_strategy(
370            &self,
371            concurrency: NonZeroUsize,
372        ) -> Result<Rayon, ThreadPoolBuildError> {
373            self.create_thread_pool(concurrency).map(Rayon::with_pool)
374        }
375    }
376
377    /// Interface to register task traces.
378    pub trait Tracing: Supervisor {
379        /// Return a context that wraps the next spawned task in a `tracing` span.
380        ///
381        /// The span's `name` field and OpenTelemetry attributes are derived from
382        /// [`Supervisor::name`]. The flag is consumed by the next
383        /// [`Spawner::spawn`] call on the returned context.
384        ///
385        /// [`Supervisor::child`] creates a new child context and does not inherit
386        /// the span flag. [`Supervisor::with_attribute`], [`Spawner::shared`],
387        /// and [`Spawner::dedicated`] keep operating on the same handle, so they
388        /// can be chained before the spawn:
389        ///
390        /// ```ignore
391        /// context
392        ///     .child("verify")
393        ///     .with_attribute("round", round)
394        ///     .with_span()
395        ///     .dedicated()
396        ///     .spawn(|context| async move {
397        ///         // work
398        ///     });
399        /// ```
400        ///
401        /// Enabling the span only affects tracing. It does not change which
402        /// metrics are registered, nor does it widen the cardinality of runtime
403        /// task metrics.
404        #[must_use]
405        fn with_span(self) -> Self;
406    }
407
408    /// Interface to register and encode metrics.
409    pub trait Metrics: Supervisor {
410        /// Register a metric with the runtime.
411        ///
412        /// Any registered metric includes the current context label as its name
413        /// prefix and the current context attributes as Prometheus labels. See
414        /// [`Supervisor`] for the identity model and examples.
415        ///
416        /// The returned [`telemetry::metrics::Registered`] value must be retained for as long as the
417        /// metric should remain exposed. Dropping the returned handle unregisters
418        /// the metric immediately.
419        ///
420        /// Re-registering the same metric key (same prefixed name and attributes)
421        /// returns another handle to the existing metric when the concrete metric
422        /// type matches. Registering the same key with a different metric type
423        /// panics.
424        ///
425        /// Names must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`.
426        fn register<N: Into<String>, H: Into<String>, M: telemetry::metrics::Metric>(
427            &self,
428            name: N,
429            help: H,
430            metric: M,
431        ) -> telemetry::metrics::Registered<M>;
432
433        /// Encode all metrics into a buffer.
434        fn encode(&self) -> String;
435    }
436
437    /// Interface for both [`Tracing`] and [`Metrics`].
438    ///
439    /// A context carries multiple pieces of observability state. They compose
440    /// freely, but they do not all feed into the same sinks:
441    ///
442    /// - `label` (set by [`Supervisor::child`]): prefix applied to metrics
443    ///   registered with [`Metrics::register`]. It also populates the `name`
444    ///   field of runtime-internal task metrics (`runtime_tasks_spawned`,
445    ///   `runtime_tasks_running`).
446    /// - `attributes` (set by [`Supervisor::with_attribute`]): Prometheus label
447    ///   dimensions on metrics registered with [`Metrics::register`]. They are
448    ///   also emitted as OpenTelemetry attributes on the per-task tracing span
449    ///   when [`Tracing::with_span`] is enabled. Runtime task metrics ignore
450    ///   attributes to keep their cardinality bounded.
451    /// - `span` (set by [`Tracing::with_span`]): wraps the next spawned task in
452    ///   a `tracing` span populated from the current `label` and `attributes`.
453    ///   It never touches metrics.
454    ///
455    /// | Builder | Registered metric name | Registered metric labels | Runtime task metrics | Tracing span |
456    /// | --- | :---: | :---: | :---: | :---: |
457    /// | `child` | prefix | - | `name` | `name` field when `with_span` is set |
458    /// | `with_attribute` | - | label dimension | - | OTel attribute when `with_span` is set |
459    /// | `with_span` | - | - | - | enables span creation |
460    pub trait Observer: Tracing + Metrics {}
461
462    /// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
463    ///
464    /// This is a convenience type alias for creating single-entity rate limiters.
465    /// For per-key rate limiting, use [KeyedRateLimiter].
466    pub type RateLimiter<C> = governor::RateLimiter<
467        governor::state::NotKeyed,
468        governor::state::InMemoryState,
469        C,
470        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
471    >;
472
473    /// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
474    ///
475    /// This is a convenience type alias for creating per-peer rate limiters
476    /// using governor's [HashMapStateStore].
477    ///
478    /// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
479    pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
480        K,
481        governor::state::keyed::HashMapStateStore<K>,
482        C,
483        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
484    >;
485
486    /// Interface that any task scheduler must implement to provide
487    /// time-based operations.
488    ///
489    /// It is necessary to mock time to provide deterministic execution
490    /// of arbitrary tasks.
491    pub trait Clock:
492        governor::clock::Clock<Instant = SystemTime>
493        + governor::clock::ReasonablyRealtime
494        + Send
495        + Sync
496        + 'static
497    {
498        /// Returns the current time.
499        fn current(&self) -> SystemTime;
500
501        /// Sleep for the given duration.
502        fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
503
504        /// Sleep until the given deadline.
505        fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
506
507        /// Await a future with a timeout, returning `Error::Timeout` if it expires.
508        ///
509        /// # Examples
510        ///
511        /// ```
512        /// use std::time::Duration;
513        /// use commonware_runtime::{deterministic, Error, Runner, Clock};
514        ///
515        /// let executor = deterministic::Runner::default();
516        /// executor.start(|context| async move {
517        ///     match context
518        ///         .timeout(Duration::from_millis(100), async { 42 })
519        ///         .await
520        ///     {
521        ///         Ok(value) => assert_eq!(value, 42),
522        ///         Err(Error::Timeout) => panic!("should not timeout"),
523        ///         Err(e) => panic!("unexpected error: {:?}", e),
524        ///     }
525        /// });
526        /// ```
527        fn timeout<F, T>(
528            &self,
529            duration: Duration,
530            future: F,
531        ) -> impl Future<Output = Result<T, Error>> + Send + '_
532        where
533            F: Future<Output = T> + Send + 'static,
534            T: Send + 'static,
535        {
536            async move {
537                select! {
538                    result = future => Ok(result),
539                    _ = self.sleep(duration) => Err(Error::Timeout),
540                }
541            }
542        }
543    }
544
545    /// Syntactic sugar for the type of [Sink] used by a given [Network] N.
546    pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
547
548    /// Syntactic sugar for the type of [Stream] used by a given [Network] N.
549    pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
550
551    /// Syntactic sugar for the type of [Listener] used by a given [Network] N.
552    pub type ListenerOf<N> = <N as crate::Network>::Listener;
553
554    /// Interface that any runtime must implement to create
555    /// network connections.
556    pub trait Network: Send + Sync + 'static {
557        /// The type of [Listener] that's returned when binding to a socket.
558        /// Accepting a connection returns a [Sink] and [Stream] which are defined
559        /// by the [Listener] and used to send and receive data over the connection.
560        type Listener: Listener;
561
562        /// Bind to the given socket address.
563        fn bind(
564            &self,
565            socket: SocketAddr,
566        ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
567
568        /// Dial the given socket address.
569        fn dial(
570            &self,
571            socket: SocketAddr,
572        ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
573    }
574
575    /// Interface for DNS resolution.
576    pub trait Resolver: Send + Sync + 'static {
577        /// Resolve a hostname to IP addresses.
578        ///
579        /// Returns a list of IP addresses that the hostname resolves to.
580        fn resolve(
581            &self,
582            host: &str,
583        ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
584    }
585
586    /// Interface that any runtime must implement to handle
587    /// incoming network connections.
588    pub trait Listener: Sync + Send + 'static {
589        /// The type of [Sink] that's returned when accepting a connection.
590        /// This is used to send data to the remote connection.
591        type Sink: Sink;
592        /// The type of [Stream] that's returned when accepting a connection.
593        /// This is used to receive data from the remote connection.
594        type Stream: Stream;
595
596        /// Accept an incoming connection.
597        fn accept(
598            &mut self,
599        ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
600
601        /// Returns the local address of the listener.
602        fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
603    }
604
605    /// Interface that any runtime must implement to send
606    /// messages over a network connection.
607    pub trait Sink: Sync + Send + 'static {
608        /// Send a message to the sink.
609        ///
610        /// # Warning
611        ///
612        /// If the sink returns an error, part of the message may still be delivered.
613        /// After any error, the sink is no longer reusable and subsequent sends will
614        /// return [`Error::Closed`].
615        ///
616        /// Dropping the future (e.g. via `select!`) also poisons the sink, since a
617        /// partial write may have occurred.
618        fn send(
619            &mut self,
620            bufs: impl Into<IoBufs> + Send,
621        ) -> impl Future<Output = Result<(), Error>> + Send;
622    }
623
624    /// Interface that any runtime must implement to receive
625    /// messages over a network connection.
626    pub trait Stream: Sync + Send + 'static {
627        /// Receive exactly `len` bytes from the stream.
628        ///
629        /// The runtime allocates the buffer and returns it as `IoBufs`.
630        ///
631        /// # Warning
632        ///
633        /// If the stream returns an error, partially read data may be discarded.
634        /// After any error, the stream is no longer reusable and subsequent receives
635        /// will return [`Error::Closed`].
636        ///
637        /// Dropping the future (e.g. via `select!`) also poisons the stream, since
638        /// partially read data may be lost.
639        fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
640
641        /// Peek at buffered data without consuming.
642        ///
643        /// Returns up to `max_len` bytes from the internal buffer, or an empty slice
644        /// if no data is currently buffered. This does not perform any I/O or block.
645        ///
646        /// This is useful e.g. for parsing length prefixes without committing to a read
647        /// or paying the cost of async.
648        fn peek(&self, max_len: usize) -> &[u8];
649    }
650
651    /// Interface to interact with storage.
652    ///
653    /// To support storage implementations that enable concurrent reads and
654    /// writes, blobs are responsible for maintaining synchronization.
655    ///
656    /// Storage can be backed by a local filesystem, cloud storage, etc.
657    ///
658    /// # Partition Names
659    ///
660    /// Partition names must be non-empty and contain only ASCII alphanumeric
661    /// characters, dashes (`-`), or underscores (`_`). Names containing other
662    /// characters (e.g., `/`, `.`, spaces) will return an error.
663    pub trait Storage: Send + Sync + 'static {
664        /// The readable/writeable storage buffer that can be opened by this Storage.
665        type Blob: Blob;
666
667        /// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
668        /// in the versions range. The blob version is omitted from the return value.
669        fn open(
670            &self,
671            partition: &str,
672            name: &[u8],
673        ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
674            async move {
675                let (blob, size, _) = self
676                    .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
677                    .await?;
678                Ok((blob, size))
679            }
680        }
681
682        /// Open an existing blob in a given partition or create a new one, returning
683        /// the blob and its length.
684        ///
685        /// Multiple instances of the same blob can be opened concurrently, however,
686        /// writing to the same blob concurrently may lead to undefined behavior.
687        ///
688        /// An Ok result indicates the blob is durably created (or already exists).
689        ///
690        /// # Versions
691        ///
692        /// Blobs are versioned. If the blob's version is not in `versions`, returns
693        /// [Error::BlobVersionMismatch].
694        ///
695        /// # Returns
696        ///
697        /// A tuple of (blob, logical_size, blob_version).
698        fn open_versioned(
699            &self,
700            partition: &str,
701            name: &[u8],
702            versions: std::ops::RangeInclusive<u16>,
703        ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
704
705        /// Remove a blob from a given partition.
706        ///
707        /// If no `name` is provided, the entire partition is removed.
708        ///
709        /// An Ok result indicates the blob is durably removed.
710        fn remove(
711            &self,
712            partition: &str,
713            name: Option<&[u8]>,
714        ) -> impl Future<Output = Result<(), Error>> + Send;
715
716        /// Return all blobs in a given partition.
717        fn scan(&self, partition: &str)
718            -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
719    }
720
721    /// Interface to read and write to a blob.
722    ///
723    /// To support blob implementations that enable concurrent reads and
724    /// writes, blobs are responsible for maintaining synchronization.
725    ///
726    /// Cloning a blob is similar to wrapping a single file descriptor in
727    /// a lock whereas opening a new blob (of the same name) is similar to
728    /// opening a new file descriptor. If multiple blobs are opened with the same
729    /// name, they are not expected to coordinate access to underlying storage
730    /// and writing to both is undefined behavior.
731    ///
732    /// When a blob is dropped, any unsynced changes may be discarded. Implementations
733    /// may attempt to sync during drop but errors will go unhandled. Call `sync`
734    /// before dropping to ensure all changes are durably persisted.
735    #[allow(clippy::len_without_is_empty)]
736    pub trait Blob: Clone + Send + Sync + 'static {
737        /// Read `len` bytes at `offset` into caller-provided buffer(s).
738        ///
739        /// The caller provides the buffer(s), and the implementation fills it with
740        /// exactly `len` bytes of data read from the blob starting at `offset`.
741        /// Returns the same buffer(s), filled with data.
742        ///
743        /// # Contract
744        ///
745        /// - The returned buffers reuse caller-provided storage, with exactly `len`
746        ///   bytes filled from `offset`.
747        /// - Caller-provided chunk layout is preserved.
748        ///
749        /// # Panics
750        ///
751        /// Panics if `len` exceeds the total capacity of `bufs`.
752        fn read_at_buf(
753            &self,
754            offset: u64,
755            len: usize,
756            bufs: impl Into<IoBufsMut> + Send,
757        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
758
759        /// Read `len` bytes at `offset`, returning a buffer(s) with exactly `len` bytes
760        /// of data read from the blob starting at `offset`.
761        ///
762        /// To reuse a buffer(s), use [`Blob::read_at_buf`].
763        fn read_at(
764            &self,
765            offset: u64,
766            len: usize,
767        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
768
769        /// Write `bufs` to the blob at the given offset.
770        fn write_at(
771            &self,
772            offset: u64,
773            bufs: impl Into<IoBufs> + Send,
774        ) -> impl Future<Output = Result<(), Error>> + Send;
775
776        /// Write `bufs` to the blob at the given offset and durably persist that write.
777        ///
778        /// This is not a durability barrier for previous operations. When it completes,
779        /// only the bytes submitted to this call are guaranteed durable. Earlier unsynced
780        /// [`Blob::write_at`] or [`Blob::resize`] calls require [`Blob::sync`] to become
781        /// durable.
782        fn write_at_sync(
783            &self,
784            offset: u64,
785            bufs: impl Into<IoBufs> + Send,
786        ) -> impl Future<Output = Result<(), Error>> + Send;
787
788        /// Resize the blob to the given length.
789        ///
790        /// If the length is greater than the current length, the blob is extended with zeros.
791        /// If the length is less than the current length, the blob is resized.
792        fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
793
794        /// Ensure all pending data is durably persisted.
795        fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
796    }
797
798    /// Interface that any runtime must implement to provide buffer pools.
799    pub trait BufferPooler: Send + Sync + 'static {
800        /// Returns the network [BufferPool].
801        fn network_buffer_pool(&self) -> &BufferPool;
802
803        /// Returns the storage [BufferPool].
804        fn storage_buffer_pool(&self) -> &BufferPool;
805    }
806});
807stability_scope!(BETA, cfg(feature = "external") {
808    /// Interface that runtimes can implement to constrain the execution latency of a future.
809    pub trait Pacer: Clock + Send + Sync + 'static {
810        /// Defer completion of a future until a specified `latency` has elapsed. If the future is
811        /// not yet ready at the desired time of completion, the runtime will block until the future
812        /// is ready.
813        ///
814        /// In [crate::deterministic], this is used to ensure interactions with external systems can
815        /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
816        /// multiple runtimes to be tested with no code changes).
817        ///
818        /// # Setting Latency
819        ///
820        /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
821        /// the expected resolution latency of the future. To better explore the possible behavior of an
822        /// application, users can set latency to a randomly chosen value in the range of
823        /// `[expected latency / 2, expected latency * 2]`.
824        ///
825        /// # Warning
826        ///
827        /// Because `pace` blocks if the future is not ready, it is important that the future's completion
828        /// doesn't require anything in the current thread to complete (or else it will deadlock).
829        fn pace<'a, F, T>(
830            &'a self,
831            latency: Duration,
832            future: F,
833        ) -> impl Future<Output = T> + Send + 'a
834        where
835            F: Future<Output = T> + Send + 'a,
836            T: Send + 'a;
837    }
838
839    /// Extension trait that makes it more ergonomic to use [Pacer].
840    ///
841    /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
842    /// runtime should delay completion relative to the clock.
843    pub trait FutureExt: Future + Send + Sized {
844        /// Delay completion of the future until a specified `latency` on `pacer`.
845        fn pace<'a, E>(
846            self,
847            pacer: &'a E,
848            latency: Duration,
849        ) -> impl Future<Output = Self::Output> + Send + 'a
850        where
851            E: Pacer + 'a,
852            Self: Send + 'a,
853            Self::Output: Send + 'a,
854        {
855            pacer.pace(latency, self)
856        }
857    }
858
859    impl<F> FutureExt for F where F: Future + Send {}
860});
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865    use crate::telemetry::{
866        metrics::{
867            count_running_tasks,
868            raw::{Counter, Family},
869            EncodeLabelKey, EncodeLabelSetTrait as EncodeLabelSet,
870            EncodeLabelValueTrait as EncodeLabelValue, LabelSetEncoder,
871        },
872        traces::collector::TraceStorage,
873    };
874    use bytes::Bytes;
875    use commonware_macros::{select, test_collect_traces};
876    use commonware_utils::{
877        channel::{mpsc, oneshot},
878        sync::Mutex,
879        NZUsize, SystemTimeExt, NZU32,
880    };
881    use futures::{
882        future::{pending, ready},
883        join, pin_mut, FutureExt,
884    };
885    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
886    use std::{
887        collections::HashMap,
888        net::{IpAddr, Ipv4Addr, Ipv6Addr},
889        pin::Pin,
890        str::FromStr,
891        sync::{
892            atomic::{AtomicU32, Ordering},
893            Arc,
894        },
895        task::{Context as TContext, Poll, Waker},
896    };
897    use tracing::{error, Level};
898    use utils::reschedule;
899
900    fn test_error_future<R: Runner>(runner: R) {
901        #[allow(clippy::unused_async)]
902        async fn error_future() -> Result<&'static str, &'static str> {
903            Err("An error occurred")
904        }
905        let result = runner.start(|_| error_future());
906        assert_eq!(result, Err("An error occurred"));
907    }
908
909    fn test_clock_sleep<R: Runner>(runner: R)
910    where
911        R::Context: Spawner + Clock,
912    {
913        runner.start(|context| async move {
914            // Capture initial time
915            let start = context.current();
916            let sleep_duration = Duration::from_millis(10);
917            context.sleep(sleep_duration).await;
918
919            // After run, time should have advanced
920            let end = context.current();
921            assert!(end.duration_since(start).unwrap() >= sleep_duration);
922        });
923    }
924
925    fn test_clock_sleep_until<R: Runner>(runner: R)
926    where
927        R::Context: Spawner + Clock + Metrics,
928    {
929        runner.start(|context| async move {
930            // Trigger sleep
931            let now = context.current();
932            context.sleep_until(now + Duration::from_millis(100)).await;
933
934            // Ensure slept duration has elapsed
935            let elapsed = now.elapsed().unwrap();
936            assert!(elapsed >= Duration::from_millis(100));
937        });
938    }
939
940    fn test_clock_sleep_until_far_future<R: Runner>(runner: R)
941    where
942        R::Context: Spawner + Clock,
943    {
944        runner.start(|context| async move {
945            let sleep = context.sleep_until(SystemTime::limit());
946            let result = context.timeout(Duration::from_millis(1), sleep).await;
947            assert!(matches!(result, Err(Error::Timeout)));
948        });
949    }
950
951    fn test_clock_timeout<R: Runner>(runner: R)
952    where
953        R::Context: Spawner + Clock,
954    {
955        runner.start(|context| async move {
956            // Future completes before timeout
957            let result = context
958                .timeout(Duration::from_millis(100), async { "success" })
959                .await;
960            assert_eq!(result.unwrap(), "success");
961
962            // Future exceeds timeout duration
963            let result = context
964                .timeout(Duration::from_millis(50), pending::<()>())
965                .await;
966            assert!(matches!(result, Err(Error::Timeout)));
967
968            // Future completes within timeout
969            let result = context
970                .timeout(
971                    Duration::from_millis(100),
972                    context.sleep(Duration::from_millis(50)),
973                )
974                .await;
975            assert!(result.is_ok());
976        });
977    }
978
979    fn test_root_finishes<R: Runner>(runner: R)
980    where
981        R::Context: Spawner,
982    {
983        runner.start(|context| async move {
984            context.spawn(|_| async move {
985                loop {
986                    reschedule().await;
987                }
988            });
989        });
990    }
991
992    fn test_spawn_after_abort<R>(runner: R)
993    where
994        R: Runner,
995        R::Context: Spawner,
996    {
997        runner.start(|context| async move {
998            // Create a child context
999            let child = context.child("child");
1000
1001            // Spawn parent and abort
1002            let parent_handle = context.spawn(move |_| async move {
1003                pending::<()>().await;
1004            });
1005            parent_handle.abort();
1006
1007            // Spawn child and ensure it aborts
1008            let child_handle = child.spawn(move |_| async move {
1009                pending::<()>().await;
1010            });
1011            assert!(matches!(child_handle.await, Err(Error::Closed)));
1012        });
1013    }
1014
1015    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
1016    where
1017        R::Context: Spawner,
1018    {
1019        runner.start(|context| async move {
1020            let context = if dedicated {
1021                assert!(!blocking);
1022                context.dedicated()
1023            } else {
1024                context.shared(blocking)
1025            };
1026
1027            let handle = context.spawn(|_| async move {
1028                loop {
1029                    reschedule().await;
1030                }
1031            });
1032            handle.abort();
1033            assert!(matches!(handle.await, Err(Error::Closed)));
1034        });
1035    }
1036
1037    fn test_panic_aborts_root<R: Runner>(runner: R) {
1038        let result: Result<(), Error> = runner.start(|_| async move {
1039            panic!("blah");
1040        });
1041        result.unwrap_err();
1042    }
1043
1044    fn test_panic_aborts_spawn<R: Runner>(runner: R)
1045    where
1046        R::Context: Spawner + Clock,
1047    {
1048        runner.start(|context| async move {
1049            context.child("panic").spawn(|_| async move {
1050                panic!("blah");
1051            });
1052
1053            // Loop until panic
1054            loop {
1055                context.sleep(Duration::from_millis(100)).await;
1056            }
1057        });
1058    }
1059
1060    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1061    where
1062        R::Context: Spawner + Clock,
1063    {
1064        let result: Result<(), Error> = runner.start(|context| async move {
1065            let result = context.child("panic").spawn(|_| async move {
1066                panic!("blah");
1067            });
1068            result.await
1069        });
1070        assert!(matches!(result, Err(Error::Exited)));
1071    }
1072
1073    fn test_multiple_panics<R: Runner>(runner: R)
1074    where
1075        R::Context: Spawner + Clock,
1076    {
1077        runner.start(|context| async move {
1078            context.child("panic").spawn(|_| async move {
1079                panic!("boom 1");
1080            });
1081            context.child("panic").spawn(|_| async move {
1082                panic!("boom 2");
1083            });
1084            context.child("panic").spawn(|_| async move {
1085                panic!("boom 3");
1086            });
1087
1088            // Loop until panic
1089            loop {
1090                context.sleep(Duration::from_millis(100)).await;
1091            }
1092        });
1093    }
1094
1095    fn test_multiple_panics_caught<R: Runner>(runner: R)
1096    where
1097        R::Context: Spawner + Clock,
1098    {
1099        let (res1, res2, res3) = runner.start(|context| async move {
1100            let handle1 = context.child("panic").spawn(|_| async move {
1101                panic!("boom 1");
1102            });
1103            let handle2 = context.child("panic").spawn(|_| async move {
1104                panic!("boom 2");
1105            });
1106            let handle3 = context.child("panic").spawn(|_| async move {
1107                panic!("boom 3");
1108            });
1109
1110            join!(handle1, handle2, handle3)
1111        });
1112        assert!(matches!(res1, Err(Error::Exited)));
1113        assert!(matches!(res2, Err(Error::Exited)));
1114        assert!(matches!(res3, Err(Error::Exited)));
1115    }
1116
1117    fn test_select<R: Runner>(runner: R) {
1118        runner.start(|_| async move {
1119            // Test first branch
1120            let output = Mutex::new(0);
1121            select! {
1122                v1 = ready(1) => {
1123                    *output.lock() = v1;
1124                },
1125                v2 = ready(2) => {
1126                    *output.lock() = v2;
1127                },
1128            };
1129            assert_eq!(*output.lock(), 1);
1130
1131            // Test second branch
1132            select! {
1133                v1 = std::future::pending::<i32>() => {
1134                    *output.lock() = v1;
1135                },
1136                v2 = ready(2) => {
1137                    *output.lock() = v2;
1138                },
1139            };
1140            assert_eq!(*output.lock(), 2);
1141        });
1142    }
1143
1144    /// Ensure future fusing works as expected.
1145    fn test_select_loop<R: Runner>(runner: R)
1146    where
1147        R::Context: Clock,
1148    {
1149        runner.start(|context| async move {
1150            // Should hit timeout
1151            let (sender, mut receiver) = mpsc::unbounded_channel();
1152            for _ in 0..2 {
1153                select! {
1154                    v = receiver.recv() => {
1155                        panic!("unexpected value: {v:?}");
1156                    },
1157                    _ = context.sleep(Duration::from_millis(100)) => {
1158                        continue;
1159                    },
1160                };
1161            }
1162
1163            // Populate channel
1164            sender.send(0).unwrap();
1165            sender.send(1).unwrap();
1166
1167            // Prefer not reading channel without losing messages
1168            select! {
1169                _ = async {} => {
1170                    // Skip reading from channel even though populated
1171                },
1172                v = receiver.recv() => {
1173                    panic!("unexpected value: {v:?}");
1174                },
1175            };
1176
1177            // Process messages
1178            for i in 0..2 {
1179                select! {
1180                    _ = context.sleep(Duration::from_millis(100)) => {
1181                        panic!("timeout");
1182                    },
1183                    v = receiver.recv() => {
1184                        assert_eq!(v.unwrap(), i);
1185                    },
1186                };
1187            }
1188        });
1189    }
1190
1191    fn test_storage_operations<R: Runner>(runner: R)
1192    where
1193        R::Context: Storage,
1194    {
1195        runner.start(|context| async move {
1196            let partition = "test_partition";
1197            let name = b"test_blob";
1198
1199            // Open a new blob
1200            let (blob, size) = context
1201                .open(partition, name)
1202                .await
1203                .expect("Failed to open blob");
1204            assert_eq!(size, 0, "new blob should have size 0");
1205
1206            // Write data to the blob
1207            let data = b"Hello, Storage!";
1208            blob.write_at(0, data)
1209                .await
1210                .expect("Failed to write to blob");
1211
1212            // Sync the blob
1213            blob.sync().await.expect("Failed to sync blob");
1214
1215            // Read data from the blob
1216            let read = blob
1217                .read_at(0, data.len())
1218                .await
1219                .expect("Failed to read from blob");
1220            assert_eq!(read.coalesce(), data);
1221
1222            // Sync the blob
1223            blob.sync().await.expect("Failed to sync blob");
1224
1225            // Scan blobs in the partition
1226            let blobs = context
1227                .scan(partition)
1228                .await
1229                .expect("Failed to scan partition");
1230            assert!(blobs.contains(&name.to_vec()));
1231
1232            // Reopen the blob
1233            let (blob, len) = context
1234                .open(partition, name)
1235                .await
1236                .expect("Failed to reopen blob");
1237            assert_eq!(len, data.len() as u64);
1238
1239            // Read data part of message back
1240            let read = blob.read_at(7, 7).await.expect("Failed to read data");
1241            assert_eq!(read.coalesce(), b"Storage");
1242
1243            // Sync the blob
1244            blob.sync().await.expect("Failed to sync blob");
1245
1246            // Remove the blob
1247            context
1248                .remove(partition, Some(name))
1249                .await
1250                .expect("Failed to remove blob");
1251
1252            // Ensure the blob is removed
1253            let blobs = context
1254                .scan(partition)
1255                .await
1256                .expect("Failed to scan partition");
1257            assert!(!blobs.contains(&name.to_vec()));
1258
1259            // Remove the partition
1260            context
1261                .remove(partition, None)
1262                .await
1263                .expect("Failed to remove partition");
1264
1265            // Scan the partition
1266            let result = context.scan(partition).await;
1267            assert!(matches!(result, Err(Error::PartitionMissing(_))));
1268        });
1269    }
1270
1271    fn test_blob_read_write<R: Runner>(runner: R)
1272    where
1273        R::Context: Storage,
1274    {
1275        runner.start(|context| async move {
1276            let partition = "test_partition";
1277            let name = b"test_blob_rw";
1278
1279            // Open a new blob
1280            let (blob, _) = context
1281                .open(partition, name)
1282                .await
1283                .expect("Failed to open blob");
1284
1285            // Write data at different offsets
1286            let data1 = b"Hello";
1287            let data2 = b"World";
1288            blob.write_at(0, data1)
1289                .await
1290                .expect("Failed to write data1");
1291            blob.write_at(5, data2)
1292                .await
1293                .expect("Failed to write data2");
1294
1295            // Read data back
1296            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1297            let read = read.coalesce();
1298            assert_eq!(&read.as_ref()[..5], data1);
1299            assert_eq!(&read.as_ref()[5..], data2);
1300
1301            // Read past end of blob
1302            let result = blob.read_at(10, 10).await;
1303            assert!(result.is_err());
1304
1305            // Rewrite data without affecting length
1306            let data3 = b"Store";
1307            blob.write_at(5, data3)
1308                .await
1309                .expect("Failed to write data3");
1310
1311            // Read data back
1312            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1313            let read = read.coalesce();
1314            assert_eq!(&read.as_ref()[..5], data1);
1315            assert_eq!(&read.as_ref()[5..], data3);
1316
1317            // Read past end of blob
1318            let result = blob.read_at(10, 10).await;
1319            assert!(result.is_err());
1320        });
1321    }
1322
1323    fn test_blob_resize<R: Runner>(runner: R)
1324    where
1325        R::Context: Storage,
1326    {
1327        runner.start(|context| async move {
1328            let partition = "test_partition_resize";
1329            let name = b"test_blob_resize";
1330
1331            // Open and write to a new blob
1332            let (blob, _) = context
1333                .open(partition, name)
1334                .await
1335                .expect("Failed to open blob");
1336
1337            let data = b"some data";
1338            blob.write_at(0, data.to_vec())
1339                .await
1340                .expect("Failed to write");
1341            blob.sync().await.expect("Failed to sync after write");
1342
1343            // Re-open and check length
1344            let (blob, len) = context.open(partition, name).await.unwrap();
1345            assert_eq!(len, data.len() as u64);
1346
1347            // Resize to extend the file
1348            let new_len = (data.len() as u64) * 2;
1349            blob.resize(new_len)
1350                .await
1351                .expect("Failed to resize to extend");
1352            blob.sync().await.expect("Failed to sync after resize");
1353
1354            // Re-open and check length again
1355            let (blob, len) = context.open(partition, name).await.unwrap();
1356            assert_eq!(len, new_len);
1357
1358            // Read original data
1359            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1360            assert_eq!(read_buf.coalesce(), data);
1361
1362            // Read extended part (should be zeros)
1363            let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1364            assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1365
1366            // Truncate the blob
1367            blob.resize(data.len() as u64).await.unwrap();
1368            blob.sync().await.unwrap();
1369
1370            // Reopen to check truncation
1371            let (blob, size) = context.open(partition, name).await.unwrap();
1372            assert_eq!(size, data.len() as u64);
1373
1374            // Read truncated data
1375            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1376            assert_eq!(read_buf.coalesce(), data);
1377            blob.sync().await.unwrap();
1378        });
1379    }
1380
1381    fn test_many_partition_read_write<R: Runner>(runner: R)
1382    where
1383        R::Context: Storage,
1384    {
1385        runner.start(|context| async move {
1386            let partitions = ["partition1", "partition2", "partition3"];
1387            let name = b"test_blob_rw";
1388            let data1 = b"Hello";
1389            let data2 = b"World";
1390
1391            for (additional, partition) in partitions.iter().enumerate() {
1392                // Open a new blob
1393                let (blob, _) = context
1394                    .open(partition, name)
1395                    .await
1396                    .expect("Failed to open blob");
1397
1398                // Write data at different offsets
1399                blob.write_at(0, data1)
1400                    .await
1401                    .expect("Failed to write data1");
1402                blob.write_at(5 + additional as u64, data2)
1403                    .await
1404                    .expect("Failed to write data2");
1405
1406                // Sync the blob
1407                blob.sync().await.expect("Failed to sync blob");
1408            }
1409
1410            for (additional, partition) in partitions.iter().enumerate() {
1411                // Open a new blob
1412                let (blob, len) = context
1413                    .open(partition, name)
1414                    .await
1415                    .expect("Failed to open blob");
1416                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1417
1418                // Read data back
1419                let read = blob
1420                    .read_at(0, 10 + additional)
1421                    .await
1422                    .expect("Failed to read data");
1423                let read = read.coalesce();
1424                assert_eq!(&read.as_ref()[..5], b"Hello");
1425                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1426            }
1427        });
1428    }
1429
1430    fn test_blob_read_past_length<R: Runner>(runner: R)
1431    where
1432        R::Context: Storage,
1433    {
1434        runner.start(|context| async move {
1435            let partition = "test_partition";
1436            let name = b"test_blob_rw";
1437
1438            // Open a new blob
1439            let (blob, _) = context
1440                .open(partition, name)
1441                .await
1442                .expect("Failed to open blob");
1443
1444            // Read data past file length (empty file)
1445            let result = blob.read_at(0, 10).await;
1446            assert!(result.is_err());
1447
1448            // Write data to the blob
1449            let data = b"Hello, Storage!".to_vec();
1450            blob.write_at(0, data)
1451                .await
1452                .expect("Failed to write to blob");
1453
1454            // Read data past file length (non-empty file)
1455            let result = blob.read_at(0, 20).await;
1456            assert!(result.is_err());
1457        })
1458    }
1459
1460    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1461    where
1462        R::Context: Spawner + Storage + Metrics,
1463    {
1464        runner.start(|context| async move {
1465            let partition = "test_partition";
1466            let name = b"test_blob_rw";
1467
1468            // Open a new blob
1469            let (blob, _) = context
1470                .open(partition, name)
1471                .await
1472                .expect("Failed to open blob");
1473
1474            // Write data to the blob
1475            let data = b"Hello, Storage!";
1476            blob.write_at(0, data)
1477                .await
1478                .expect("Failed to write to blob");
1479
1480            // Sync the blob
1481            blob.sync().await.expect("Failed to sync blob");
1482
1483            // Read data from the blob in clone
1484            let check1 = context.child("check1").spawn({
1485                let blob = blob.clone();
1486                let data_len = data.len();
1487                move |_| async move {
1488                    let read = blob
1489                        .read_at(0, data_len)
1490                        .await
1491                        .expect("Failed to read from blob");
1492                    assert_eq!(read.coalesce(), data);
1493                }
1494            });
1495            let check2 = context.child("check2").spawn({
1496                let blob = blob.clone();
1497                let data_len = data.len();
1498                move |_| async move {
1499                    let read = blob
1500                        .read_at(0, data_len)
1501                        .await
1502                        .expect("Failed to read from blob");
1503                    assert_eq!(read.coalesce(), data);
1504                }
1505            });
1506
1507            // Wait for both reads to complete
1508            let result = join!(check1, check2);
1509            assert!(result.0.is_ok());
1510            assert!(result.1.is_ok());
1511
1512            // Read data from the blob
1513            let read = blob
1514                .read_at(0, data.len())
1515                .await
1516                .expect("Failed to read from blob");
1517            assert_eq!(read.coalesce(), data);
1518
1519            // Drop the blob
1520            drop(blob);
1521
1522            // Ensure no blobs still open
1523            let buffer = context.encode();
1524            assert!(buffer.contains("open_blobs 0"));
1525        });
1526    }
1527
1528    fn test_shutdown<R: Runner>(runner: R)
1529    where
1530        R::Context: Spawner + Metrics + Clock,
1531    {
1532        let kill = 9;
1533        runner.start(|context| async move {
1534            // Spawn a task that waits for signal
1535            let before = context.child("before").spawn(move |context| async move {
1536                let mut signal = context.stopped();
1537                let value = (&mut signal).await.unwrap();
1538                assert_eq!(value, kill);
1539                drop(signal);
1540            });
1541
1542            // Signal the tasks and wait for them to stop
1543            let result = context.child("stop").stop(kill, None).await;
1544            assert!(result.is_ok());
1545
1546            // Spawn a task after stop is called
1547            let after = context.child("after").spawn(move |context| async move {
1548                // A call to `stopped()` after `stop()` resolves immediately
1549                let value = context.stopped().await.unwrap();
1550                assert_eq!(value, kill);
1551            });
1552
1553            // Ensure both tasks complete
1554            let result = join!(before, after);
1555            assert!(result.0.is_ok());
1556            assert!(result.1.is_ok());
1557        });
1558    }
1559
1560    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1561    where
1562        R::Context: Spawner + Metrics + Clock,
1563    {
1564        let kill = 42;
1565        runner.start(|context| async move {
1566            let (started_tx, mut started_rx) = mpsc::channel(3);
1567            let counter = Arc::new(AtomicU32::new(0));
1568
1569            // Spawn 3 tasks that do cleanup work after receiving stop signal
1570            // and increment a shared counter
1571            let task = |context: R::Context, cleanup_duration: Duration| {
1572                let counter = counter.clone();
1573                let started_tx = started_tx.clone();
1574                context.spawn(move |context| async move {
1575                    // Wait for signal to be acquired
1576                    let mut signal = context.stopped();
1577                    started_tx.send(()).await.unwrap();
1578
1579                    // Increment once killed
1580                    let value = (&mut signal).await.unwrap();
1581                    assert_eq!(value, kill);
1582                    context.sleep(cleanup_duration).await;
1583                    counter.fetch_add(1, Ordering::SeqCst);
1584
1585                    // Wait to drop signal until work has been done
1586                    drop(signal);
1587                })
1588            };
1589
1590            let task1 = task(context.child("cleanup"), Duration::from_millis(10));
1591            let task2 = task(context.child("cleanup"), Duration::from_millis(20));
1592            let task3 = task(context.child("cleanup"), Duration::from_millis(30));
1593
1594            // Give tasks time to start
1595            for _ in 0..3 {
1596                started_rx.recv().await.unwrap();
1597            }
1598
1599            // Stop and verify all cleanup completed
1600            context.stop(kill, None).await.unwrap();
1601            assert_eq!(counter.load(Ordering::SeqCst), 3);
1602
1603            // Ensure all tasks completed
1604            let result = join!(task1, task2, task3);
1605            assert!(result.0.is_ok());
1606            assert!(result.1.is_ok());
1607            assert!(result.2.is_ok());
1608        });
1609    }
1610
1611    fn test_shutdown_timeout<R: Runner>(runner: R)
1612    where
1613        R::Context: Spawner + Metrics + Clock,
1614    {
1615        let kill = 42;
1616        runner.start(|context| async move {
1617            // Setup startup coordinator
1618            let (started_tx, started_rx) = oneshot::channel();
1619
1620            // Spawn a task that never completes its cleanup
1621            context.child("signal").spawn(move |context| async move {
1622                let signal = context.stopped();
1623                started_tx.send(()).unwrap();
1624                pending::<()>().await;
1625                signal.await.unwrap();
1626            });
1627
1628            // Try to stop with a timeout
1629            started_rx.await.unwrap();
1630            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1631
1632            // Assert that we got a timeout error
1633            assert!(matches!(result, Err(Error::Timeout)));
1634        });
1635    }
1636
1637    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1638    where
1639        R::Context: Spawner + Metrics + Clock,
1640    {
1641        let kill1 = 42;
1642        let kill2 = 43;
1643
1644        runner.start(|context| async move {
1645            let (started_tx, started_rx) = oneshot::channel();
1646            let counter = Arc::new(AtomicU32::new(0));
1647
1648            // Spawn a task that delays completion to test timing
1649            let task = context.child("blocking_task").spawn({
1650                let counter = counter.clone();
1651                move |context| async move {
1652                    // Wait for signal to be acquired
1653                    let mut signal = context.stopped();
1654                    started_tx.send(()).unwrap();
1655
1656                    // Wait for signal to be resolved
1657                    let value = (&mut signal).await.unwrap();
1658                    assert_eq!(value, kill1);
1659                    context.sleep(Duration::from_millis(50)).await;
1660
1661                    // Increment counter
1662                    counter.fetch_add(1, Ordering::SeqCst);
1663                    drop(signal);
1664                }
1665            });
1666
1667            // Give task time to start
1668            started_rx.await.unwrap();
1669
1670            // Issue two separate stop calls
1671            // The second stop call uses a different stop value that should be ignored
1672            let stop_task1 = context.child("stop").stop(kill1, None);
1673            pin_mut!(stop_task1);
1674            let stop_task2 = context.child("stop").stop(kill2, None);
1675            pin_mut!(stop_task2);
1676
1677            // Both of them should be awaiting completion
1678            assert!(stop_task1.as_mut().now_or_never().is_none());
1679            assert!(stop_task2.as_mut().now_or_never().is_none());
1680
1681            // Wait for both stop calls to complete
1682            assert!(stop_task1.await.is_ok());
1683            assert!(stop_task2.await.is_ok());
1684
1685            // Verify first stop value wins
1686            let sig = context.stopped().await;
1687            assert_eq!(sig.unwrap(), kill1);
1688
1689            // Wait for blocking task to complete
1690            let result = task.await;
1691            assert!(result.is_ok());
1692            assert_eq!(counter.load(Ordering::SeqCst), 1);
1693
1694            // Post-completion stop should return immediately
1695            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1696        });
1697    }
1698
1699    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1700    where
1701        R::Context: Spawner + Metrics,
1702    {
1703        runner.start(|context| async move {
1704            // Spawn a task that waits for signal
1705            context.child("before").spawn(move |context| async move {
1706                let mut signal = context.stopped();
1707                let value = (&mut signal).await.unwrap();
1708
1709                // We should never reach this point
1710                assert_eq!(value, 42);
1711                drop(signal);
1712            });
1713
1714            // Ensure waker is registered
1715            reschedule().await;
1716        });
1717    }
1718
1719    fn test_spawn_dedicated<R: Runner>(runner: R)
1720    where
1721        R::Context: Spawner,
1722    {
1723        runner.start(|context| async move {
1724            let handle = context.dedicated().spawn(|_| async move { 42 });
1725            assert!(matches!(handle.await, Ok(42)));
1726        });
1727    }
1728
1729    fn test_spawn<R: Runner>(runner: R)
1730    where
1731        R::Context: Spawner + Clock,
1732    {
1733        runner.start(|context| async move {
1734            let child_handle = Arc::new(Mutex::new(None));
1735            let child_handle2 = child_handle.clone();
1736
1737            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1738            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1739            let parent_handle = context.spawn(move |context| async move {
1740                // Spawn child that completes immediately
1741                let handle = context.spawn(|_| async {});
1742
1743                // Store child handle so we can test it later
1744                *child_handle2.lock() = Some(handle);
1745
1746                parent_initialized_tx.send(()).unwrap();
1747
1748                // Parent task completes
1749                parent_complete_rx.await.unwrap();
1750            });
1751
1752            // Wait for parent task to spawn the children
1753            parent_initialized_rx.await.unwrap();
1754
1755            // Child task completes successfully
1756            let child_handle = child_handle.lock().take().unwrap();
1757            assert!(child_handle.await.is_ok());
1758
1759            // Complete the parent task
1760            parent_complete_tx.send(()).unwrap();
1761
1762            // Wait for parent task to complete successfully
1763            assert!(parent_handle.await.is_ok());
1764        });
1765    }
1766
1767    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1768    where
1769        R::Context: Spawner + Clock,
1770    {
1771        runner.start(|context| async move {
1772            let child_handle = Arc::new(Mutex::new(None));
1773            let child_handle2 = child_handle.clone();
1774
1775            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1776            let parent_handle = context.spawn(move |context| async move {
1777                // Spawn child task that hangs forever, should be aborted when parent aborts
1778                let handle = context.spawn(|_| pending::<()>());
1779
1780                // Store child task handle so we can test it later
1781                *child_handle2.lock() = Some(handle);
1782
1783                parent_initialized_tx.send(()).unwrap();
1784
1785                // Parent task runs until aborted
1786                pending::<()>().await
1787            });
1788
1789            // Wait for parent task to spawn the children
1790            parent_initialized_rx.await.unwrap();
1791
1792            // Abort parent task
1793            parent_handle.abort();
1794            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1795
1796            // Child task should also resolve with error since its parent aborted
1797            let child_handle = child_handle.lock().take().unwrap();
1798            assert!(matches!(child_handle.await, Err(Error::Closed)));
1799        });
1800    }
1801
1802    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1803    where
1804        R::Context: Spawner + Clock,
1805    {
1806        runner.start(|context| async move {
1807            let child_handle = Arc::new(Mutex::new(None));
1808            let child_handle2 = child_handle.clone();
1809
1810            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1811            let parent_handle = context.spawn(move |context| async move {
1812                // Spawn child task that hangs forever, should be aborted when parent completes
1813                let handle = context.spawn(|_| pending::<()>());
1814
1815                // Store child task handle so we can test it later
1816                *child_handle2.lock() = Some(handle);
1817
1818                // Parent task completes
1819                parent_complete_rx.await.unwrap();
1820            });
1821
1822            // Fire parent completion
1823            parent_complete_tx.send(()).unwrap();
1824
1825            // Wait for parent task to complete
1826            assert!(parent_handle.await.is_ok());
1827
1828            // Child task should resolve with error since its parent has completed
1829            let child_handle = child_handle.lock().take().unwrap();
1830            assert!(matches!(child_handle.await, Err(Error::Closed)));
1831        });
1832    }
1833
1834    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1835    where
1836        R::Context: Spawner + Clock,
1837    {
1838        runner.start(|context| async move {
1839            // We create the following tree of tasks. All tasks will run
1840            // indefinitely (until aborted).
1841            //
1842            //          root
1843            //     /     |     \
1844            //    /      |      \
1845            //   c0      c1      c2
1846            //  /  \    /  \    /  \
1847            // g0  g1  g2  g3  g4  g5
1848            let c0 = context.child("c0");
1849            let g0 = c0.child("g0");
1850            let g1 = c0.child("g1");
1851            let c1 = context.child("c1");
1852            let g2 = c1.child("g2");
1853            let g3 = c1.child("g3");
1854            let c2 = context.child("c2");
1855            let g4 = c2.child("g4");
1856            let g5 = c2.child("g5");
1857
1858            // Spawn tasks
1859            let handles = Arc::new(Mutex::new(Vec::new()));
1860            let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1861            let root_task = context.spawn({
1862                let handles = handles.clone();
1863                move |_| async move {
1864                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1865                    {
1866                        let handle = context.spawn({
1867                            let handles = handles.clone();
1868                            let initialized_tx = initialized_tx.clone();
1869                            move |_| async move {
1870                                for grandchild in grandchildren {
1871                                    let handle = grandchild.spawn(|_| async {
1872                                        pending::<()>().await;
1873                                    });
1874                                    handles.lock().push(handle);
1875                                    initialized_tx.send(()).await.unwrap();
1876                                }
1877
1878                                pending::<()>().await;
1879                            }
1880                        });
1881                        handles.lock().push(handle);
1882                        initialized_tx.send(()).await.unwrap();
1883                    }
1884
1885                    pending::<()>().await;
1886                }
1887            });
1888
1889            // Wait for tasks to initialize
1890            for _ in 0..9 {
1891                initialized_rx.recv().await.unwrap();
1892            }
1893
1894            // Verify we have all 9 handles (3 children + 6 grandchildren)
1895            assert_eq!(handles.lock().len(), 9);
1896
1897            // Abort root task
1898            root_task.abort();
1899            assert!(matches!(root_task.await, Err(Error::Closed)));
1900
1901            // All handles should resolve with error due to cascading abort
1902            let handles = handles.lock().drain(..).collect::<Vec<_>>();
1903            for handle in handles {
1904                assert!(matches!(handle.await, Err(Error::Closed)));
1905            }
1906        });
1907    }
1908
1909    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1910    where
1911        R::Context: Spawner + Clock,
1912    {
1913        runner.start(|context| async move {
1914            let (child_started_tx, child_started_rx) = oneshot::channel();
1915            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1916            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1917            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1918            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1919            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1920            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1921
1922            let parent = context.spawn(move |context| async move {
1923                // Spawn a child task
1924                let child_handle = context.child("child").spawn(|_| async move {
1925                    child_started_tx.send(()).unwrap();
1926                    // Wait for signal to complete
1927                    child_complete_rx.await.unwrap();
1928                });
1929                assert!(
1930                    child_handle_tx.send(child_handle).is_ok(),
1931                    "child handle receiver dropped"
1932                );
1933
1934                // Spawn an independent sibling task
1935                let sibling_handle = context.child("sibling").spawn(move |_| async move {
1936                    sibling_started_tx.send(()).unwrap();
1937                    // Wait for signal to complete
1938                    sibling_complete_rx.await.unwrap();
1939                });
1940                assert!(
1941                    sibling_handle_tx.send(sibling_handle).is_ok(),
1942                    "sibling handle receiver dropped"
1943                );
1944
1945                // Wait for signal to complete
1946                parent_complete_rx.await.unwrap();
1947            });
1948
1949            // Wait for both to start
1950            child_started_rx.await.unwrap();
1951            sibling_started_rx.await.unwrap();
1952
1953            // Kill the sibling
1954            sibling_complete_tx.send(()).unwrap();
1955            assert!(sibling_handle_rx.await.is_ok());
1956
1957            // The child task should still be alive
1958            child_complete_tx.send(()).unwrap();
1959            assert!(child_handle_rx.await.is_ok());
1960
1961            // As well as the parent
1962            parent_complete_tx.send(()).unwrap();
1963            assert!(parent.await.is_ok());
1964        });
1965    }
1966
1967    fn test_spawn_clone_chain<R: Runner>(runner: R)
1968    where
1969        R::Context: Spawner + Clock,
1970    {
1971        runner.start(|context| async move {
1972            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1973            let (child_started_tx, child_started_rx) = oneshot::channel();
1974            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1975            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1976            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1977
1978            let parent = context.child("parent").spawn({
1979                move |context| async move {
1980                    let child = context.child("child").spawn({
1981                        move |context| async move {
1982                            let grandchild = context.child("grandchild").spawn({
1983                                move |_| async move {
1984                                    grandchild_started_tx.send(()).unwrap();
1985                                    pending::<()>().await;
1986                                }
1987                            });
1988                            assert!(
1989                                grandchild_handle_tx.send(grandchild).is_ok(),
1990                                "grandchild handle receiver dropped"
1991                            );
1992                            child_started_tx.send(()).unwrap();
1993                            pending::<()>().await;
1994                        }
1995                    });
1996                    assert!(
1997                        child_handle_tx.send(child).is_ok(),
1998                        "child handle receiver dropped"
1999                    );
2000                    parent_started_tx.send(()).unwrap();
2001                    pending::<()>().await;
2002                }
2003            });
2004
2005            parent_started_rx.await.unwrap();
2006            child_started_rx.await.unwrap();
2007            grandchild_started_rx.await.unwrap();
2008
2009            let child_handle = child_handle_rx.await.unwrap();
2010            let grandchild_handle = grandchild_handle_rx.await.unwrap();
2011
2012            parent.abort();
2013            assert!(parent.await.is_err());
2014
2015            assert!(child_handle.await.is_err());
2016            assert!(grandchild_handle.await.is_err());
2017        });
2018    }
2019
2020    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
2021    where
2022        R::Context: Spawner + Clock,
2023    {
2024        runner.start(|context| async move {
2025            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
2026            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
2027
2028            let parent = context.child("parent").spawn({
2029                move |context| async move {
2030                    let clone1 = context.child("clone1");
2031                    let clone2 = clone1.child("clone2");
2032                    let clone3 = clone2.child("clone3");
2033
2034                    let leaf = clone3.spawn({
2035                        move |_| async move {
2036                            leaf_started_tx.send(()).unwrap();
2037                            pending::<()>().await;
2038                        }
2039                    });
2040
2041                    leaf_handle_tx
2042                        .send(leaf)
2043                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
2044                    pending::<()>().await;
2045                }
2046            });
2047
2048            leaf_started_rx.await.unwrap();
2049            let leaf_handle = leaf_handle_rx.await.unwrap();
2050
2051            parent.abort();
2052            assert!(parent.await.is_err());
2053            assert!(leaf_handle.await.is_err());
2054        });
2055    }
2056
2057    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2058    where
2059        R::Context: Spawner,
2060    {
2061        runner.start(|context| async move {
2062            let context = if dedicated {
2063                context.dedicated()
2064            } else {
2065                context.shared(true)
2066            };
2067
2068            let handle = context.spawn(|_| async move { 42 });
2069            let result = handle.await;
2070            assert!(matches!(result, Ok(42)));
2071        });
2072    }
2073
2074    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2075    where
2076        R::Context: Spawner + Clock,
2077    {
2078        runner.start(|context| async move {
2079            if dedicated {
2080                context.child("blocking").dedicated().spawn(|_| async move {
2081                    panic!("blocking task panicked");
2082                });
2083            } else {
2084                context
2085                    .child("blocking")
2086                    .shared(true)
2087                    .spawn(|_| async move {
2088                        panic!("blocking task panicked");
2089                    });
2090            }
2091
2092            // Loop until panic
2093            loop {
2094                context.sleep(Duration::from_millis(100)).await;
2095            }
2096        });
2097    }
2098
2099    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2100    where
2101        R::Context: Spawner + Clock,
2102    {
2103        let result: Result<(), Error> = runner.start(|context| async move {
2104            let handle = if dedicated {
2105                context.child("blocking").dedicated().spawn(|_| async move {
2106                    panic!("blocking task panicked");
2107                })
2108            } else {
2109                context
2110                    .child("blocking")
2111                    .shared(true)
2112                    .spawn(|_| async move {
2113                        panic!("blocking task panicked");
2114                    })
2115            };
2116            handle.await
2117        });
2118        assert!(matches!(result, Err(Error::Exited)));
2119    }
2120
2121    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2122        runner.start(|_| async move {
2123            // Setup tracked resource
2124            let dropper = Arc::new(());
2125            let executor = deterministic::Runner::default();
2126            executor.start({
2127                let dropper = dropper.clone();
2128                move |context| async move {
2129                    // Create tasks with circular dependencies through channels
2130                    let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2131                    let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2132                    let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2133
2134                    // Task 1 holds tx2 and waits on rx1
2135                    context.child("task1").spawn({
2136                        let setup_tx = setup_tx.clone();
2137                        let dropper = dropper.clone();
2138                        move |_| async move {
2139                            // Setup deadlock and mark ready
2140                            tx2.send(()).unwrap();
2141                            rx1.recv().await.unwrap();
2142                            setup_tx.send(()).unwrap();
2143
2144                            // Wait forever
2145                            while rx1.recv().await.is_some() {}
2146                            drop(tx2);
2147                            drop(dropper);
2148                        }
2149                    });
2150
2151                    // Task 2 holds tx1 and waits on rx2
2152                    context.child("task2").spawn(move |_| async move {
2153                        // Setup deadlock and mark ready
2154                        tx1.send(()).unwrap();
2155                        rx2.recv().await.unwrap();
2156                        setup_tx.send(()).unwrap();
2157
2158                        // Wait forever
2159                        while rx2.recv().await.is_some() {}
2160                        drop(tx1);
2161                        drop(dropper);
2162                    });
2163
2164                    // Wait for tasks to start
2165                    setup_rx.recv().await.unwrap();
2166                    setup_rx.recv().await.unwrap();
2167                }
2168            });
2169
2170            // After runtime drop, both tasks should be cleaned up
2171            Arc::try_unwrap(dropper).expect("references remaining");
2172        });
2173    }
2174
2175    fn test_late_waker<R: Runner>(runner: R)
2176    where
2177        R::Context: Metrics + Spawner,
2178    {
2179        // A future that captures its waker and sends it to the caller, then
2180        // stays pending forever.
2181        struct CaptureWaker {
2182            tx: Option<oneshot::Sender<Waker>>,
2183            sent: bool,
2184        }
2185        impl Future for CaptureWaker {
2186            type Output = ();
2187            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2188                if !self.sent {
2189                    if let Some(tx) = self.tx.take() {
2190                        // Send a clone of the current task's waker to the root
2191                        let _ = tx.send(cx.waker().clone());
2192                    }
2193                    self.sent = true;
2194                }
2195                Poll::Pending
2196            }
2197        }
2198
2199        // A guard that wakes the captured waker on drop.
2200        struct WakeOnDrop(Option<Waker>);
2201        impl Drop for WakeOnDrop {
2202            fn drop(&mut self) {
2203                if let Some(w) = self.0.take() {
2204                    w.wake_by_ref();
2205                }
2206            }
2207        }
2208
2209        // Run the executor to completion
2210        let holder = runner.start(|context| async move {
2211            // Wire a oneshot to receive the task waker.
2212            let (tx, rx) = oneshot::channel::<Waker>();
2213
2214            // Spawn a task that registers its waker and then stays pending.
2215            context.child("capture_waker").spawn(move |_| async move {
2216                CaptureWaker {
2217                    tx: Some(tx),
2218                    sent: false,
2219                }
2220                .await;
2221            });
2222
2223            // Ensure the spawned task runs and registers its waker.
2224            utils::reschedule().await;
2225
2226            // Receive the waker from the spawned task.
2227            let waker = rx.await.expect("waker not received");
2228
2229            // Return a guard that will wake after the runtime has dropped.
2230            WakeOnDrop(Some(waker))
2231        });
2232
2233        // Dropping the guard after the runtime has torn down will trigger a wake on
2234        // a task whose executor has been dropped.
2235        drop(holder);
2236    }
2237
2238    fn test_metrics<R: Runner>(runner: R)
2239    where
2240        R::Context: Metrics,
2241    {
2242        runner.start(|context| async move {
2243            // Assert label
2244            assert_eq!(context.name().label, "");
2245
2246            // Register a metric
2247            let counter = Counter::<u64>::default();
2248            let _registered = context.register("test", "test", counter.clone());
2249
2250            // Increment the counter
2251            counter.inc();
2252
2253            // Encode metrics
2254            let buffer = context.encode();
2255            assert!(buffer.contains("test_total 1"));
2256
2257            // Nested context
2258            let context = context.child("nested");
2259            let nested_counter = Counter::<u64>::default();
2260            let _nested_registered = context.register("test", "test", nested_counter.clone());
2261
2262            // Increment the counter
2263            nested_counter.inc();
2264
2265            // Encode metrics
2266            let buffer = context.encode();
2267            assert!(buffer.contains("nested_test_total 1"));
2268            assert!(buffer.contains("test_total 1"));
2269        });
2270    }
2271
2272    fn test_metrics_with_attribute<R: Runner>(runner: R)
2273    where
2274        R::Context: Metrics,
2275    {
2276        runner.start(|context| async move {
2277            // Create context with a attribute
2278            let ctx_epoch5 = context.child("consensus").with_attribute("epoch", "e5");
2279
2280            // Register a metric with the attribute
2281            let counter = Counter::<u64>::default();
2282            let _epoch5 = ctx_epoch5.register("votes", "vote count", counter.clone());
2283            counter.inc();
2284
2285            // Encode and verify the attribute appears as a label
2286            let buffer = context.encode();
2287            assert!(
2288                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2289                "Expected metric with epoch attribute, got: {}",
2290                buffer
2291            );
2292
2293            // Create context with different epoch attribute (same metric name)
2294            let ctx_epoch6 = context.child("consensus").with_attribute("epoch", "e6");
2295            let counter2 = Counter::<u64>::default();
2296            let _epoch6 = ctx_epoch6.register("votes", "vote count", counter2.clone());
2297            counter2.inc();
2298            counter2.inc();
2299
2300            // Both should appear in encoded output with canonical format (single HELP/TYPE)
2301            let buffer = context.encode();
2302            assert!(
2303                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2304                "Expected metric with epoch=e5, got: {}",
2305                buffer
2306            );
2307            assert!(
2308                buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2309                "Expected metric with epoch=e6, got: {}",
2310                buffer
2311            );
2312
2313            // Verify canonical format: HELP and TYPE should appear exactly once
2314            assert_eq!(
2315                buffer.matches("# HELP consensus_votes").count(),
2316                1,
2317                "HELP should appear exactly once, got: {}",
2318                buffer
2319            );
2320            assert_eq!(
2321                buffer.matches("# TYPE consensus_votes").count(),
2322                1,
2323                "TYPE should appear exactly once, got: {}",
2324                buffer
2325            );
2326
2327            // Multiple attributes
2328            let ctx_multi = context
2329                .child("engine")
2330                .with_attribute("region", "us")
2331                .with_attribute("instance", "i1");
2332            let counter3 = Counter::<u64>::default();
2333            let _multi = ctx_multi.register("requests", "request count", counter3.clone());
2334            counter3.inc();
2335
2336            let buffer = context.encode();
2337            assert!(
2338                buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2339                "Expected metric with sorted attributes, got: {}",
2340                buffer
2341            );
2342        });
2343    }
2344
2345    #[test]
2346    fn test_deterministic_metrics_with_attribute() {
2347        let executor = deterministic::Runner::default();
2348        test_metrics_with_attribute(executor);
2349    }
2350
2351    #[test]
2352    fn test_tokio_metrics_with_attribute() {
2353        let runner = tokio::Runner::default();
2354        test_metrics_with_attribute(runner);
2355    }
2356
2357    fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2358    where
2359        R::Context: Metrics,
2360    {
2361        runner.start(|context| async move {
2362            // Create context with attribute, then nest a label
2363            let ctx = context
2364                .child("orchestrator")
2365                .with_attribute("epoch", "e5")
2366                .child("engine");
2367
2368            // Register a metric
2369            let counter = Counter::<u64>::default();
2370            let _registered = ctx.register("votes", "vote count", counter.clone());
2371            counter.inc();
2372
2373            // Verify the attribute is preserved through the nested label
2374            let buffer = context.encode();
2375            assert!(
2376                buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2377                "Expected metric with preserved epoch attribute, got: {}",
2378                buffer
2379            );
2380
2381            // Multiple levels of nesting with attributes at different levels
2382            let ctx2 = context
2383                .child("outer")
2384                .with_attribute("region", "us")
2385                .child("middle")
2386                .with_attribute("az", "east")
2387                .child("inner");
2388
2389            let counter2 = Counter::<u64>::default();
2390            let _registered2 = ctx2.register("requests", "request count", counter2.clone());
2391            counter2.inc();
2392            counter2.inc();
2393
2394            let buffer = context.encode();
2395            assert!(
2396                buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2397                "Expected metric with all attributes preserved and sorted, got: {}",
2398                buffer
2399            );
2400        });
2401    }
2402
2403    #[test]
2404    fn test_deterministic_metrics_attribute_with_nested_label() {
2405        let executor = deterministic::Runner::default();
2406        test_metrics_attribute_with_nested_label(executor);
2407    }
2408
2409    #[test]
2410    fn test_tokio_metrics_attribute_with_nested_label() {
2411        let runner = tokio::Runner::default();
2412        test_metrics_attribute_with_nested_label(runner);
2413    }
2414
2415    fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2416    where
2417        R::Context: Metrics,
2418    {
2419        runner.start(|context| async move {
2420            // Create two separate sub-contexts, each with their own attribute
2421            let ctx_a = context.child("component_a").with_attribute("epoch", 1);
2422            let ctx_b = context.child("component_b").with_attribute("epoch", 2);
2423
2424            // Register metrics in ctx_a
2425            let c1 = Counter::<u64>::default();
2426            let _ctx_a_requests = ctx_a.register("requests", "help", c1);
2427
2428            // Register metrics in ctx_b
2429            let c2 = Counter::<u64>::default();
2430            let _ctx_b_requests = ctx_b.register("requests", "help", c2);
2431
2432            // Register another metric in ctx_a AFTER ctx_b was used
2433            let c3 = Counter::<u64>::default();
2434            let _ctx_a_errors = ctx_a.register("errors", "help", c3);
2435
2436            let output = context.encode();
2437
2438            // ctx_a metrics should only have epoch=1
2439            assert!(
2440                output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2441                "ctx_a requests should have epoch=1: {output}"
2442            );
2443            assert!(
2444                output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2445                "ctx_a errors should have epoch=1: {output}"
2446            );
2447            assert!(
2448                !output.contains("component_a_requests_total{epoch=\"2\"}"),
2449                "ctx_a requests should not have epoch=2: {output}"
2450            );
2451
2452            // ctx_b metrics should only have epoch=2
2453            assert!(
2454                output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2455                "ctx_b should have epoch=2: {output}"
2456            );
2457            assert!(
2458                !output.contains("component_b_requests_total{epoch=\"1\"}"),
2459                "ctx_b should not have epoch=1: {output}"
2460            );
2461        });
2462    }
2463
2464    #[test]
2465    fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2466        let executor = deterministic::Runner::default();
2467        test_metrics_attributes_isolated_between_contexts(executor);
2468    }
2469
2470    #[test]
2471    fn test_tokio_metrics_attributes_isolated_between_contexts() {
2472        let runner = tokio::Runner::default();
2473        test_metrics_attributes_isolated_between_contexts(runner);
2474    }
2475
2476    /// Regression test for https://github.com/commonwarexyz/monorepo/issues/3485.
2477    ///
2478    /// Verifies the documented guarantee that runtime task metrics ignore context
2479    /// attributes: spawning with a varying `with_attribute` (as the marshaled
2480    /// consensus code does for each round) must not create per-value entries in
2481    /// `runtime_tasks_spawned` / `runtime_tasks_running`.
2482    fn test_metrics_spawn_attribute_cardinality<R: Runner>(runner: R)
2483    where
2484        R::Context: Spawner + Metrics + Clock,
2485    {
2486        runner.start(|context| async move {
2487            const ROUNDS: u64 = 128;
2488
2489            let mut handles = Vec::with_capacity(ROUNDS as usize);
2490            for round in 0..ROUNDS {
2491                let handle = context
2492                    .child("deferred_verify")
2493                    .with_attribute("round", round)
2494                    .spawn(move |_| async move { round });
2495                handles.push(handle);
2496            }
2497            for (expected, handle) in handles.into_iter().enumerate() {
2498                assert_eq!(handle.await.expect("task failed"), expected as u64);
2499            }
2500
2501            // handle.await resolves when the task's output is ready, but
2502            // the running-gauge decrement fires on task-struct drop which
2503            // may lag slightly. Yield to the executor so it can run cleanup.
2504            while count_running_tasks(&context, "deferred_verify") > 0 {
2505                context.sleep(Duration::from_millis(10)).await;
2506            }
2507            let buffer = context.encode();
2508
2509            // Count occurrences of each runtime task metric for our label. If
2510            // attributes were incorrectly folded into the task family key, we
2511            // would see ROUNDS distinct time series instead of one.
2512            let spawned_lines = buffer
2513                .lines()
2514                .filter(|line| {
2515                    line.starts_with("runtime_tasks_spawned_total{")
2516                        && line.contains("name=\"deferred_verify\"")
2517                })
2518                .count();
2519            let running_lines = buffer
2520                .lines()
2521                .filter(|line| {
2522                    line.starts_with("runtime_tasks_running{")
2523                        && line.contains("name=\"deferred_verify\"")
2524                })
2525                .count();
2526            assert_eq!(
2527                spawned_lines, 1,
2528                "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2529            );
2530            assert_eq!(
2531                running_lines, 1,
2532                "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2533            );
2534
2535            // The single spawned-counter entry should reflect every round.
2536            let spawned_value = format!(
2537                "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2538            );
2539            assert!(
2540                buffer.contains(&spawned_value),
2541                "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2542            );
2543            let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2544            assert!(
2545                buffer.contains(running_value),
2546                "expected running gauge to return to 0, got: {buffer}",
2547            );
2548
2549            // The per-round attribute must not surface on task metrics (the
2550            // task `Label` does not include context attributes).
2551            assert!(
2552                !buffer
2553                    .lines()
2554                    .any(|line| line.starts_with("runtime_tasks_")
2555                        && line.contains("round=")),
2556                "task metrics must not carry `round` attribute: {buffer}",
2557            );
2558        });
2559    }
2560
2561    #[test]
2562    fn test_deterministic_metrics_spawn_attribute_cardinality() {
2563        let executor = deterministic::Runner::default();
2564        test_metrics_spawn_attribute_cardinality(executor);
2565    }
2566
2567    #[test]
2568    fn test_tokio_metrics_spawn_attribute_cardinality() {
2569        let runner = tokio::Runner::default();
2570        test_metrics_spawn_attribute_cardinality(runner);
2571    }
2572
2573    fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2574    where
2575        R::Context: Metrics,
2576    {
2577        runner.start(|context| async move {
2578            // Create two contexts with same attributes but different order
2579            let ctx_ab = context
2580                .child("service")
2581                .with_attribute("region", "us")
2582                .with_attribute("env", "prod");
2583
2584            let ctx_ba = context
2585                .child("service")
2586                .with_attribute("env", "prod")
2587                .with_attribute("region", "us");
2588
2589            // Register via first context
2590            let c1 = Counter::<u64>::default();
2591            let _requests = ctx_ab.register("requests", "help", c1.clone());
2592            c1.inc();
2593
2594            // Register via second context - same attributes, different metric
2595            let c2 = Counter::<u64>::default();
2596            let _errors = ctx_ba.register("errors", "help", c2.clone());
2597            c2.inc();
2598            c2.inc();
2599
2600            let output = context.encode();
2601
2602            // Both should have the same label order (alphabetically sorted: env, region)
2603            assert!(
2604                output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2605                "requests should have sorted labels: {output}"
2606            );
2607            assert!(
2608                output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2609                "errors should have sorted labels: {output}"
2610            );
2611
2612            // Should NOT have reverse order
2613            assert!(
2614                !output.contains("region=\"us\",env=\"prod\""),
2615                "should not have unsorted label order: {output}"
2616            );
2617        });
2618    }
2619
2620    #[test]
2621    fn test_deterministic_metrics_attributes_sorted_deterministically() {
2622        let executor = deterministic::Runner::default();
2623        test_metrics_attributes_sorted_deterministically(executor);
2624    }
2625
2626    #[test]
2627    fn test_tokio_metrics_attributes_sorted_deterministically() {
2628        let runner = tokio::Runner::default();
2629        test_metrics_attributes_sorted_deterministically(runner);
2630    }
2631
2632    fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2633    where
2634        R::Context: Metrics,
2635    {
2636        runner.start(|context| async move {
2637            // Service A: plain, no nested labels
2638            let svc_a = context.child("service_a");
2639
2640            // Service A with attribute (same top-level label, different context)
2641            let svc_a_v2 = context.child("service_a").with_attribute("version", 2);
2642
2643            // Service B with nested label: service_b_worker
2644            let svc_b_worker = context.child("service_b").child("worker");
2645
2646            // Service B with nested label AND attribute
2647            let svc_b_worker_shard = context
2648                .child("service_b")
2649                .child("worker")
2650                .with_attribute("shard", 99);
2651
2652            // Service B different nested label: service_b_manager
2653            let svc_b_manager = context.child("service_b").child("manager");
2654
2655            // Service C: plain, proves no cross-service contamination
2656            let svc_c = context.child("service_c");
2657
2658            // Register metrics in all contexts
2659            let c1 = Counter::<u64>::default();
2660            let _svc_a = svc_a.register("requests", "help", c1);
2661
2662            let c2 = Counter::<u64>::default();
2663            let _svc_a_v2 = svc_a_v2.register("requests", "help", c2);
2664
2665            let c3 = Counter::<u64>::default();
2666            let _svc_b_worker = svc_b_worker.register("tasks", "help", c3);
2667
2668            let c4 = Counter::<u64>::default();
2669            let _svc_b_worker_shard = svc_b_worker_shard.register("tasks", "help", c4);
2670
2671            let c5 = Counter::<u64>::default();
2672            let _svc_b_manager = svc_b_manager.register("decisions", "help", c5);
2673
2674            let c6 = Counter::<u64>::default();
2675            let _svc_c = svc_c.register("requests", "help", c6);
2676
2677            let output = context.encode();
2678
2679            // Service A plain and attributed both exist independently
2680            assert!(
2681                output.contains("service_a_requests_total 0"),
2682                "svc_a plain should exist: {output}"
2683            );
2684            assert!(
2685                output.contains("service_a_requests_total{version=\"2\"} 0"),
2686                "svc_a_v2 should have version=2: {output}"
2687            );
2688
2689            // Service B worker: plain and attributed versions
2690            assert!(
2691                output.contains("service_b_worker_tasks_total 0"),
2692                "svc_b_worker plain should exist: {output}"
2693            );
2694            assert!(
2695                output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2696                "svc_b_worker_shard should have shard=99: {output}"
2697            );
2698
2699            // Service B manager: no attributes
2700            assert!(
2701                output.contains("service_b_manager_decisions_total 0"),
2702                "svc_b_manager should have no attributes: {output}"
2703            );
2704            assert!(
2705                !output.contains("service_b_manager_decisions_total{"),
2706                "svc_b_manager should have no attributes at all: {output}"
2707            );
2708
2709            // Service C: no attributes, no contamination
2710            assert!(
2711                output.contains("service_c_requests_total 0"),
2712                "svc_c should have no attributes: {output}"
2713            );
2714            assert!(
2715                !output.contains("service_c_requests_total{"),
2716                "svc_c should have no attributes at all: {output}"
2717            );
2718
2719            // Cross-contamination checks
2720            assert!(
2721                !output.contains("service_b_manager_decisions_total{shard="),
2722                "svc_b_manager should not have shard: {output}"
2723            );
2724            assert!(
2725                !output.contains("service_a_requests_total{shard="),
2726                "svc_a should not have shard: {output}"
2727            );
2728            assert!(
2729                !output.contains("service_c_requests_total{version="),
2730                "svc_c should not have version: {output}"
2731            );
2732        });
2733    }
2734
2735    #[test]
2736    fn test_deterministic_metrics_nested_labels_with_attributes() {
2737        let executor = deterministic::Runner::default();
2738        test_metrics_nested_labels_with_attributes(executor);
2739    }
2740
2741    #[test]
2742    fn test_tokio_metrics_nested_labels_with_attributes() {
2743        let runner = tokio::Runner::default();
2744        test_metrics_nested_labels_with_attributes(runner);
2745    }
2746
2747    fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2748    where
2749        R::Context: Metrics,
2750    {
2751        runner.start(|context| async move {
2752            #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2753            struct RequestLabels {
2754                method: String,
2755                status: u16,
2756            }
2757
2758            // Create context with attribute
2759            let ctx = context
2760                .child("api")
2761                .with_attribute("region", "us_east")
2762                .with_attribute("env", "prod");
2763
2764            // Register a Family metric
2765            let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2766            let _requests = ctx.register("requests", "HTTP requests", requests.clone());
2767
2768            // Increment counters for different label combinations
2769            requests
2770                .get_or_create(&RequestLabels {
2771                    method: "GET".to_string(),
2772                    status: 200,
2773                })
2774                .inc();
2775            requests
2776                .get_or_create(&RequestLabels {
2777                    method: "POST".to_string(),
2778                    status: 201,
2779                })
2780                .inc();
2781            requests
2782                .get_or_create(&RequestLabels {
2783                    method: "GET".to_string(),
2784                    status: 404,
2785                })
2786                .inc();
2787
2788            let output = context.encode();
2789
2790            // Context attributes appear first (alphabetically sorted), then Family labels
2791            // Context attributes: env="prod", region="us_east"
2792            // Family labels: method, status
2793            assert!(
2794                output.contains(
2795                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2796                ),
2797                "GET 200 should have merged labels: {output}"
2798            );
2799            assert!(
2800                output.contains(
2801                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2802                ),
2803                "POST 201 should have merged labels: {output}"
2804            );
2805            assert!(
2806                output.contains(
2807                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2808                ),
2809                "GET 404 should have merged labels: {output}"
2810            );
2811
2812            // Create another context WITHOUT attributes to verify isolation
2813            let ctx_plain = context.child("api_plain");
2814            let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2815            let _plain_requests =
2816                ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2817
2818            plain_requests
2819                .get_or_create(&RequestLabels {
2820                    method: "DELETE".to_string(),
2821                    status: 204,
2822                })
2823                .inc();
2824
2825            let output = context.encode();
2826
2827            // Plain context should have Family labels but no context attributes
2828            assert!(
2829                output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2830                "plain DELETE should have only family labels: {output}"
2831            );
2832            assert!(
2833                !output.contains("api_plain_requests_total{env="),
2834                "plain should not have env attribute: {output}"
2835            );
2836            assert!(
2837                !output.contains("api_plain_requests_total{region="),
2838                "plain should not have region attribute: {output}"
2839            );
2840        });
2841    }
2842
2843    #[test]
2844    fn test_deterministic_metrics_family_with_attributes() {
2845        let executor = deterministic::Runner::default();
2846        test_metrics_family_with_attributes(executor);
2847    }
2848
2849    #[test]
2850    fn test_tokio_metrics_family_with_attributes() {
2851        let runner = tokio::Runner::default();
2852        test_metrics_family_with_attributes(runner);
2853    }
2854
2855    fn test_register_and_encode<R: Runner>(runner: R)
2856    where
2857        R::Context: Metrics,
2858    {
2859        runner.start(|context| async move {
2860            let counter =
2861                context
2862                    .child("engine")
2863                    .register("votes", "vote count", Counter::<u64>::default());
2864            counter.inc();
2865
2866            let buffer = context.encode();
2867            assert!(
2868                buffer.contains("engine_votes_total 1"),
2869                "registered metric should appear in encode: {buffer}"
2870            );
2871        });
2872    }
2873
2874    #[test]
2875    fn test_deterministic_register_and_encode() {
2876        let executor = deterministic::Runner::default();
2877        test_register_and_encode(executor);
2878    }
2879
2880    #[test]
2881    fn test_tokio_register_and_encode() {
2882        let runner = tokio::Runner::default();
2883        test_register_and_encode(runner);
2884    }
2885
2886    fn test_register_drop_removes_metrics<R: Runner>(runner: R)
2887    where
2888        R::Context: Metrics,
2889    {
2890        runner.start(|context| async move {
2891            let permanent = context.child("permanent").register(
2892                "counter",
2893                "permanent counter",
2894                Counter::<u64>::default(),
2895            );
2896            permanent.inc();
2897
2898            let counter =
2899                context
2900                    .child("engine")
2901                    .register("votes", "vote count", Counter::<u64>::default());
2902            counter.inc();
2903
2904            let buffer = context.encode();
2905            assert!(buffer.contains("permanent_counter_total 1"));
2906            assert!(buffer.contains("engine_votes_total 1"));
2907
2908            drop(counter);
2909
2910            let buffer = context.encode();
2911            assert!(
2912                buffer.contains("permanent_counter_total 1"),
2913                "other registered metrics should survive handle drop: {buffer}"
2914            );
2915            assert!(
2916                !buffer.contains("engine_votes"),
2917                "metric should be removed after handle drop: {buffer}"
2918            );
2919        });
2920    }
2921
2922    #[test]
2923    fn test_deterministic_register_drop_removes_metrics() {
2924        let executor = deterministic::Runner::default();
2925        test_register_drop_removes_metrics(executor);
2926    }
2927
2928    #[test]
2929    fn test_tokio_register_drop_removes_metrics() {
2930        let runner = tokio::Runner::default();
2931        test_register_drop_removes_metrics(runner);
2932    }
2933
2934    fn test_register_with_attributes<R: Runner>(runner: R)
2935    where
2936        R::Context: Metrics,
2937    {
2938        runner.start(|context| async move {
2939            let epoch1 = context.child("engine").with_attribute("epoch", 1).register(
2940                "votes",
2941                "vote count",
2942                Counter::<u64>::default(),
2943            );
2944            epoch1.inc();
2945
2946            let epoch2 = context.child("engine").with_attribute("epoch", 2).register(
2947                "votes",
2948                "vote count",
2949                Counter::<u64>::default(),
2950            );
2951            epoch2.inc();
2952            epoch2.inc();
2953
2954            let buffer = context.encode();
2955            assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
2956            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2957
2958            assert_eq!(
2959                buffer.matches("# HELP engine_votes").count(),
2960                1,
2961                "HELP should appear once: {buffer}"
2962            );
2963            assert_eq!(
2964                buffer.matches("# TYPE engine_votes").count(),
2965                1,
2966                "TYPE should appear once: {buffer}"
2967            );
2968
2969            drop(epoch1);
2970            let buffer = context.encode();
2971            assert!(
2972                !buffer.contains("epoch=\"1\""),
2973                "epoch 1 should be gone: {buffer}"
2974            );
2975            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2976
2977            drop(epoch2);
2978            let buffer = context.encode();
2979            assert!(
2980                !buffer.contains("engine_votes"),
2981                "all epoch metrics should be gone: {buffer}"
2982            );
2983        });
2984    }
2985
2986    #[test]
2987    fn test_deterministic_register_with_attributes() {
2988        let executor = deterministic::Runner::default();
2989        test_register_with_attributes(executor);
2990    }
2991
2992    #[test]
2993    fn test_tokio_register_with_attributes() {
2994        let runner = tokio::Runner::default();
2995        test_register_with_attributes(runner);
2996    }
2997
2998    fn test_reregister_after_drop<R: Runner>(runner: R)
2999    where
3000        R::Context: Metrics,
3001    {
3002        runner.start(|context| async move {
3003            let votes = context.child("engine").with_attribute("epoch", 1).register(
3004                "votes",
3005                "vote count",
3006                Counter::<u64>::default(),
3007            );
3008            drop(votes);
3009
3010            let replacement = context.child("engine").with_attribute("epoch", 1).register(
3011                "votes",
3012                "vote count",
3013                Counter::<u64>::default(),
3014            );
3015            drop(replacement);
3016        });
3017    }
3018
3019    #[test]
3020    fn test_deterministic_reregister_after_drop() {
3021        let executor = deterministic::Runner::default();
3022        test_reregister_after_drop(executor);
3023    }
3024
3025    #[test]
3026    fn test_tokio_reregister_after_drop() {
3027        let runner = tokio::Runner::default();
3028        test_reregister_after_drop(runner);
3029    }
3030
3031    fn test_register_clone_keeps_metric_alive<R: Runner>(runner: R)
3032    where
3033        R::Context: Metrics,
3034    {
3035        runner.start(|context| async move {
3036            let registered =
3037                context
3038                    .child("engine")
3039                    .register("votes", "vote count", Counter::<u64>::default());
3040            registered.inc();
3041            let clone = registered.clone();
3042
3043            let buffer = context.encode();
3044            assert!(
3045                buffer.contains("engine_votes_total 1"),
3046                "metric should remain registered while any handle exists: {buffer}"
3047            );
3048
3049            drop(registered);
3050            let buffer = context.encode();
3051            assert!(
3052                buffer.contains("engine_votes_total 1"),
3053                "metric should survive while clone is retained: {buffer}"
3054            );
3055
3056            drop(clone);
3057            let buffer = context.encode();
3058            assert!(
3059                !buffer.contains("engine_votes"),
3060                "metric should be removed when all handle clones are dropped: {buffer}"
3061            );
3062        });
3063    }
3064
3065    #[test]
3066    fn test_deterministic_register_clone_keeps_metric_alive() {
3067        let executor = deterministic::Runner::default();
3068        test_register_clone_keeps_metric_alive(executor);
3069    }
3070
3071    #[test]
3072    fn test_tokio_register_clone_keeps_metric_alive() {
3073        let runner = tokio::Runner::default();
3074        test_register_clone_keeps_metric_alive(runner);
3075    }
3076
3077    fn test_encode_single_eof<R: Runner>(runner: R)
3078    where
3079        R::Context: Metrics,
3080    {
3081        runner.start(|context| async move {
3082            let root_counter = context.register("root", "root metric", Counter::<u64>::default());
3083            root_counter.inc();
3084
3085            let child =
3086                context
3087                    .child("engine")
3088                    .register("ops", "child metric", Counter::<u64>::default());
3089            child.inc();
3090
3091            let buffer = context.encode();
3092            assert!(
3093                buffer.contains("root_total 1"),
3094                "root metric missing: {buffer}"
3095            );
3096            assert!(
3097                buffer.contains("engine_ops_total 1"),
3098                "child metric missing: {buffer}"
3099            );
3100            assert_eq!(
3101                buffer.matches("# EOF").count(),
3102                1,
3103                "expected exactly one EOF marker: {buffer}"
3104            );
3105            assert!(
3106                buffer.ends_with("# EOF\n"),
3107                "EOF must be the last line: {buffer}"
3108            );
3109        });
3110    }
3111
3112    #[test]
3113    fn test_deterministic_encode_single_eof() {
3114        let executor = deterministic::Runner::default();
3115        test_encode_single_eof(executor);
3116    }
3117
3118    #[test]
3119    fn test_tokio_encode_single_eof() {
3120        let runner = tokio::Runner::default();
3121        test_encode_single_eof(runner);
3122    }
3123
3124    fn test_family_with_attributes<R: Runner>(runner: R)
3125    where
3126        R::Context: Metrics,
3127    {
3128        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3129        struct Peer {
3130            name: String,
3131        }
3132        impl EncodeLabelSet for Peer {
3133            fn encode(&self, encoder: &mut LabelSetEncoder<'_>) -> Result<(), std::fmt::Error> {
3134                let mut label = encoder.encode_label();
3135                let mut key = label.encode_label_key()?;
3136                EncodeLabelKey::encode(&"peer", &mut key)?;
3137                let mut value = key.encode_label_value()?;
3138                EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3139                value.finish()
3140            }
3141        }
3142
3143        runner.start(|context| async move {
3144            let family = context
3145                .child("batcher")
3146                .with_attribute("epoch", 1)
3147                .register(
3148                    "votes",
3149                    "votes per peer",
3150                    Family::<Peer, Counter>::default(),
3151                );
3152            family
3153                .get_or_create(&Peer {
3154                    name: "alice".into(),
3155                })
3156                .inc();
3157            family.get_or_create(&Peer { name: "bob".into() }).inc();
3158
3159            let buffer = context.encode();
3160            assert!(
3161                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3162                "family with attributes should combine labels: {buffer}"
3163            );
3164            assert!(
3165                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3166                "family with attributes should combine labels: {buffer}"
3167            );
3168
3169            drop(family);
3170            let buffer = context.encode();
3171            assert!(
3172                !buffer.contains("batcher_votes"),
3173                "family metrics should be removed: {buffer}"
3174            );
3175        });
3176    }
3177
3178    #[test]
3179    fn test_deterministic_family_with_attributes() {
3180        let executor = deterministic::Runner::default();
3181        test_family_with_attributes(executor);
3182    }
3183
3184    #[test]
3185    fn test_tokio_family_with_attributes() {
3186        let runner = tokio::Runner::default();
3187        test_family_with_attributes(runner);
3188    }
3189
3190    #[test]
3191    fn test_deterministic_future() {
3192        let runner = deterministic::Runner::default();
3193        test_error_future(runner);
3194    }
3195
3196    #[test]
3197    fn test_deterministic_clock_sleep() {
3198        let executor = deterministic::Runner::default();
3199        test_clock_sleep(executor);
3200    }
3201
3202    #[test]
3203    fn test_deterministic_clock_sleep_until() {
3204        let executor = deterministic::Runner::default();
3205        test_clock_sleep_until(executor);
3206    }
3207
3208    #[test]
3209    fn test_deterministic_clock_sleep_until_far_future() {
3210        let executor = deterministic::Runner::default();
3211        test_clock_sleep_until_far_future(executor);
3212    }
3213
3214    #[test]
3215    fn test_deterministic_clock_timeout() {
3216        let executor = deterministic::Runner::default();
3217        test_clock_timeout(executor);
3218    }
3219
3220    #[test]
3221    fn test_deterministic_root_finishes() {
3222        let executor = deterministic::Runner::default();
3223        test_root_finishes(executor);
3224    }
3225
3226    #[test]
3227    fn test_deterministic_spawn_after_abort() {
3228        let executor = deterministic::Runner::default();
3229        test_spawn_after_abort(executor);
3230    }
3231
3232    #[test]
3233    fn test_deterministic_spawn_abort() {
3234        let executor = deterministic::Runner::default();
3235        test_spawn_abort(executor, false, false);
3236    }
3237
3238    #[test]
3239    #[should_panic(expected = "blah")]
3240    fn test_deterministic_panic_aborts_root() {
3241        let runner = deterministic::Runner::default();
3242        test_panic_aborts_root(runner);
3243    }
3244
3245    #[test]
3246    #[should_panic(expected = "blah")]
3247    fn test_deterministic_panic_aborts_root_caught() {
3248        let cfg = deterministic::Config::default().with_catch_panics(true);
3249        let runner = deterministic::Runner::new(cfg);
3250        test_panic_aborts_root(runner);
3251    }
3252
3253    #[test]
3254    #[should_panic(expected = "blah")]
3255    fn test_deterministic_panic_aborts_spawn() {
3256        let executor = deterministic::Runner::default();
3257        test_panic_aborts_spawn(executor);
3258    }
3259
3260    #[test]
3261    fn test_deterministic_panic_aborts_spawn_caught() {
3262        let cfg = deterministic::Config::default().with_catch_panics(true);
3263        let executor = deterministic::Runner::new(cfg);
3264        test_panic_aborts_spawn_caught(executor);
3265    }
3266
3267    #[test]
3268    #[should_panic(expected = "boom")]
3269    fn test_deterministic_multiple_panics() {
3270        let executor = deterministic::Runner::default();
3271        test_multiple_panics(executor);
3272    }
3273
3274    #[test]
3275    fn test_deterministic_multiple_panics_caught() {
3276        let cfg = deterministic::Config::default().with_catch_panics(true);
3277        let executor = deterministic::Runner::new(cfg);
3278        test_multiple_panics_caught(executor);
3279    }
3280
3281    #[test]
3282    fn test_deterministic_select() {
3283        let executor = deterministic::Runner::default();
3284        test_select(executor);
3285    }
3286
3287    #[test]
3288    fn test_deterministic_select_loop() {
3289        let executor = deterministic::Runner::default();
3290        test_select_loop(executor);
3291    }
3292
3293    #[test]
3294    fn test_deterministic_storage_operations() {
3295        let executor = deterministic::Runner::default();
3296        test_storage_operations(executor);
3297    }
3298
3299    #[test]
3300    fn test_deterministic_blob_read_write() {
3301        let executor = deterministic::Runner::default();
3302        test_blob_read_write(executor);
3303    }
3304
3305    #[test]
3306    fn test_deterministic_blob_resize() {
3307        let executor = deterministic::Runner::default();
3308        test_blob_resize(executor);
3309    }
3310
3311    #[test]
3312    fn test_deterministic_many_partition_read_write() {
3313        let executor = deterministic::Runner::default();
3314        test_many_partition_read_write(executor);
3315    }
3316
3317    #[test]
3318    fn test_deterministic_blob_read_past_length() {
3319        let executor = deterministic::Runner::default();
3320        test_blob_read_past_length(executor);
3321    }
3322
3323    #[test]
3324    fn test_deterministic_blob_clone_and_concurrent_read() {
3325        // Run test
3326        let executor = deterministic::Runner::default();
3327        test_blob_clone_and_concurrent_read(executor);
3328    }
3329
3330    #[test]
3331    fn test_deterministic_shutdown() {
3332        let executor = deterministic::Runner::default();
3333        test_shutdown(executor);
3334    }
3335
3336    #[test]
3337    fn test_deterministic_shutdown_multiple_signals() {
3338        let executor = deterministic::Runner::default();
3339        test_shutdown_multiple_signals(executor);
3340    }
3341
3342    #[test]
3343    fn test_deterministic_shutdown_timeout() {
3344        let executor = deterministic::Runner::default();
3345        test_shutdown_timeout(executor);
3346    }
3347
3348    #[test]
3349    fn test_deterministic_shutdown_multiple_stop_calls() {
3350        let executor = deterministic::Runner::default();
3351        test_shutdown_multiple_stop_calls(executor);
3352    }
3353
3354    #[test]
3355    fn test_deterministic_unfulfilled_shutdown() {
3356        let executor = deterministic::Runner::default();
3357        test_unfulfilled_shutdown(executor);
3358    }
3359
3360    #[test]
3361    fn test_deterministic_spawn_dedicated() {
3362        let executor = deterministic::Runner::default();
3363        test_spawn_dedicated(executor);
3364    }
3365
3366    #[test]
3367    fn test_deterministic_spawn() {
3368        let runner = deterministic::Runner::default();
3369        test_spawn(runner);
3370    }
3371
3372    #[test]
3373    fn test_deterministic_spawn_abort_on_parent_abort() {
3374        let runner = deterministic::Runner::default();
3375        test_spawn_abort_on_parent_abort(runner);
3376    }
3377
3378    #[test]
3379    fn test_deterministic_spawn_abort_on_parent_completion() {
3380        let runner = deterministic::Runner::default();
3381        test_spawn_abort_on_parent_completion(runner);
3382    }
3383
3384    #[test]
3385    fn test_deterministic_spawn_cascading_abort() {
3386        let runner = deterministic::Runner::default();
3387        test_spawn_cascading_abort(runner);
3388    }
3389
3390    #[test]
3391    fn test_deterministic_child_survives_sibling_completion() {
3392        let runner = deterministic::Runner::default();
3393        test_child_survives_sibling_completion(runner);
3394    }
3395
3396    #[test]
3397    fn test_deterministic_spawn_clone_chain() {
3398        let runner = deterministic::Runner::default();
3399        test_spawn_clone_chain(runner);
3400    }
3401
3402    #[test]
3403    fn test_deterministic_spawn_sparse_clone_chain() {
3404        let runner = deterministic::Runner::default();
3405        test_spawn_sparse_clone_chain(runner);
3406    }
3407
3408    #[test]
3409    fn test_deterministic_spawn_blocking() {
3410        for dedicated in [false, true] {
3411            let executor = deterministic::Runner::default();
3412            test_spawn_blocking(executor, dedicated);
3413        }
3414    }
3415
3416    #[test]
3417    #[should_panic(expected = "blocking task panicked")]
3418    fn test_deterministic_spawn_blocking_panic() {
3419        for dedicated in [false, true] {
3420            let executor = deterministic::Runner::default();
3421            test_spawn_blocking_panic(executor, dedicated);
3422        }
3423    }
3424
3425    #[test]
3426    fn test_deterministic_spawn_blocking_panic_caught() {
3427        for dedicated in [false, true] {
3428            let cfg = deterministic::Config::default().with_catch_panics(true);
3429            let executor = deterministic::Runner::new(cfg);
3430            test_spawn_blocking_panic_caught(executor, dedicated);
3431        }
3432    }
3433
3434    #[test]
3435    fn test_deterministic_spawn_blocking_abort() {
3436        for (dedicated, blocking) in [(false, true), (true, false)] {
3437            let executor = deterministic::Runner::default();
3438            test_spawn_abort(executor, dedicated, blocking);
3439        }
3440    }
3441
3442    #[test]
3443    fn test_deterministic_circular_reference_prevents_cleanup() {
3444        let executor = deterministic::Runner::default();
3445        test_circular_reference_prevents_cleanup(executor);
3446    }
3447
3448    #[test]
3449    fn test_deterministic_late_waker() {
3450        let executor = deterministic::Runner::default();
3451        test_late_waker(executor);
3452    }
3453
3454    #[test]
3455    fn test_deterministic_metrics() {
3456        let executor = deterministic::Runner::default();
3457        test_metrics(executor);
3458    }
3459
3460    #[test_collect_traces]
3461    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3462        let executor = deterministic::Runner::new(deterministic::Config::default());
3463        executor.start(|context| async move {
3464            context
3465                .child("test")
3466                .with_span()
3467                .spawn(|context| async move {
3468                    tracing::info!(field = "test field", "test log");
3469
3470                    context
3471                        .child("inner")
3472                        .with_span()
3473                        .spawn(|_| async move {
3474                            tracing::info!("inner log");
3475                        })
3476                        .await
3477                        .unwrap();
3478                })
3479                .await
3480                .unwrap();
3481        });
3482
3483        let info_traces = traces.get_by_level(Level::INFO);
3484        assert_eq!(info_traces.len(), 2);
3485
3486        // Outer log (single span)
3487        info_traces
3488            .expect_event_at_index(0, |event| {
3489                event.metadata.expect_content_exact("test log")?;
3490                event.metadata.expect_field_count(1)?;
3491                event.metadata.expect_field_exact("field", "test field")?;
3492                event.expect_span_count(1)?;
3493                event.expect_span_at_index(0, |span| {
3494                    span.expect_content_exact("task")?;
3495                    span.expect_field_count(1)?;
3496                    span.expect_field_exact("name", "test")
3497                })
3498            })
3499            .unwrap();
3500
3501        info_traces
3502            .expect_event_at_index(1, |event| {
3503                event.metadata.expect_content_exact("inner log")?;
3504                event.metadata.expect_field_count(0)?;
3505                event.expect_span_count(1)?;
3506                event.expect_span_at_index(0, |span| {
3507                    span.expect_content_exact("task")?;
3508                    span.expect_field_count(1)?;
3509                    span.expect_field_exact("name", "test_inner")
3510                })
3511            })
3512            .unwrap();
3513    }
3514
3515    #[test]
3516    fn test_deterministic_resolver() {
3517        let executor = deterministic::Runner::default();
3518        executor.start(|context| async move {
3519            // Register DNS mappings
3520            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3521            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3522            context.resolver_register("example.com", Some(vec![ip1, ip2]));
3523
3524            // Resolve registered hostname
3525            let addrs = context.resolve("example.com").await.unwrap();
3526            assert_eq!(addrs, vec![ip1, ip2]);
3527
3528            // Resolve unregistered hostname
3529            let result = context.resolve("unknown.com").await;
3530            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3531
3532            // Remove mapping
3533            context.resolver_register("example.com", None);
3534            let result = context.resolve("example.com").await;
3535            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3536        });
3537    }
3538
3539    #[test]
3540    fn test_tokio_error_future() {
3541        let runner = tokio::Runner::default();
3542        test_error_future(runner);
3543    }
3544
3545    #[test]
3546    fn test_tokio_clock_sleep() {
3547        let executor = tokio::Runner::default();
3548        test_clock_sleep(executor);
3549    }
3550
3551    #[test]
3552    fn test_tokio_clock_sleep_until() {
3553        let executor = tokio::Runner::default();
3554        test_clock_sleep_until(executor);
3555    }
3556
3557    #[test]
3558    fn test_tokio_clock_sleep_until_far_future() {
3559        let executor = tokio::Runner::default();
3560        test_clock_sleep_until_far_future(executor);
3561    }
3562
3563    #[test]
3564    fn test_tokio_clock_timeout() {
3565        let executor = tokio::Runner::default();
3566        test_clock_timeout(executor);
3567    }
3568
3569    #[test]
3570    fn test_tokio_root_finishes() {
3571        let executor = tokio::Runner::default();
3572        test_root_finishes(executor);
3573    }
3574
3575    #[test]
3576    fn test_tokio_spawn_after_abort() {
3577        let executor = tokio::Runner::default();
3578        test_spawn_after_abort(executor);
3579    }
3580
3581    #[test]
3582    fn test_tokio_spawn_abort() {
3583        let executor = tokio::Runner::default();
3584        test_spawn_abort(executor, false, false);
3585    }
3586
3587    #[test]
3588    #[should_panic(expected = "blah")]
3589    fn test_tokio_panic_aborts_root() {
3590        let executor = tokio::Runner::default();
3591        test_panic_aborts_root(executor);
3592    }
3593
3594    #[test]
3595    #[should_panic(expected = "blah")]
3596    fn test_tokio_panic_aborts_root_caught() {
3597        let cfg = tokio::Config::default().with_catch_panics(true);
3598        let executor = tokio::Runner::new(cfg);
3599        test_panic_aborts_root(executor);
3600    }
3601
3602    #[test]
3603    #[should_panic(expected = "blah")]
3604    fn test_tokio_panic_aborts_spawn() {
3605        let executor = tokio::Runner::default();
3606        test_panic_aborts_spawn(executor);
3607    }
3608
3609    #[test]
3610    fn test_tokio_panic_aborts_spawn_caught() {
3611        let cfg = tokio::Config::default().with_catch_panics(true);
3612        let executor = tokio::Runner::new(cfg);
3613        test_panic_aborts_spawn_caught(executor);
3614    }
3615
3616    #[test]
3617    #[should_panic(expected = "boom")]
3618    fn test_tokio_multiple_panics() {
3619        let executor = tokio::Runner::default();
3620        test_multiple_panics(executor);
3621    }
3622
3623    #[test]
3624    fn test_tokio_multiple_panics_caught() {
3625        let cfg = tokio::Config::default().with_catch_panics(true);
3626        let executor = tokio::Runner::new(cfg);
3627        test_multiple_panics_caught(executor);
3628    }
3629
3630    #[test]
3631    fn test_tokio_select() {
3632        let executor = tokio::Runner::default();
3633        test_select(executor);
3634    }
3635
3636    #[test]
3637    fn test_tokio_select_loop() {
3638        let executor = tokio::Runner::default();
3639        test_select_loop(executor);
3640    }
3641
3642    #[test]
3643    fn test_tokio_storage_operations() {
3644        let executor = tokio::Runner::default();
3645        test_storage_operations(executor);
3646    }
3647
3648    #[test]
3649    fn test_tokio_blob_read_write() {
3650        let executor = tokio::Runner::default();
3651        test_blob_read_write(executor);
3652    }
3653
3654    #[test]
3655    fn test_tokio_blob_resize() {
3656        let executor = tokio::Runner::default();
3657        test_blob_resize(executor);
3658    }
3659
3660    #[test]
3661    fn test_tokio_many_partition_read_write() {
3662        let executor = tokio::Runner::default();
3663        test_many_partition_read_write(executor);
3664    }
3665
3666    #[test]
3667    fn test_tokio_blob_read_past_length() {
3668        let executor = tokio::Runner::default();
3669        test_blob_read_past_length(executor);
3670    }
3671
3672    #[test]
3673    fn test_tokio_blob_clone_and_concurrent_read() {
3674        // Run test
3675        let executor = tokio::Runner::default();
3676        test_blob_clone_and_concurrent_read(executor);
3677    }
3678
3679    #[test]
3680    fn test_tokio_shutdown() {
3681        let executor = tokio::Runner::default();
3682        test_shutdown(executor);
3683    }
3684
3685    #[test]
3686    fn test_tokio_shutdown_multiple_signals() {
3687        let executor = tokio::Runner::default();
3688        test_shutdown_multiple_signals(executor);
3689    }
3690
3691    #[test]
3692    fn test_tokio_shutdown_timeout() {
3693        let executor = tokio::Runner::default();
3694        test_shutdown_timeout(executor);
3695    }
3696
3697    #[test]
3698    fn test_tokio_shutdown_multiple_stop_calls() {
3699        let executor = tokio::Runner::default();
3700        test_shutdown_multiple_stop_calls(executor);
3701    }
3702
3703    #[test]
3704    fn test_tokio_unfulfilled_shutdown() {
3705        let executor = tokio::Runner::default();
3706        test_unfulfilled_shutdown(executor);
3707    }
3708
3709    #[test]
3710    fn test_tokio_spawn_dedicated() {
3711        let executor = tokio::Runner::default();
3712        test_spawn_dedicated(executor);
3713    }
3714
3715    #[test]
3716    fn test_tokio_spawn() {
3717        let runner = tokio::Runner::default();
3718        test_spawn(runner);
3719    }
3720
3721    #[test]
3722    fn test_tokio_spawn_abort_on_parent_abort() {
3723        let runner = tokio::Runner::default();
3724        test_spawn_abort_on_parent_abort(runner);
3725    }
3726
3727    #[test]
3728    fn test_tokio_spawn_abort_on_parent_completion() {
3729        let runner = tokio::Runner::default();
3730        test_spawn_abort_on_parent_completion(runner);
3731    }
3732
3733    #[test]
3734    fn test_tokio_spawn_cascading_abort() {
3735        let runner = tokio::Runner::default();
3736        test_spawn_cascading_abort(runner);
3737    }
3738
3739    #[test]
3740    fn test_tokio_child_survives_sibling_completion() {
3741        let runner = tokio::Runner::default();
3742        test_child_survives_sibling_completion(runner);
3743    }
3744
3745    #[test]
3746    fn test_tokio_spawn_clone_chain() {
3747        let runner = tokio::Runner::default();
3748        test_spawn_clone_chain(runner);
3749    }
3750
3751    #[test]
3752    fn test_tokio_spawn_sparse_clone_chain() {
3753        let runner = tokio::Runner::default();
3754        test_spawn_sparse_clone_chain(runner);
3755    }
3756
3757    #[test]
3758    fn test_tokio_spawn_blocking() {
3759        for dedicated in [false, true] {
3760            let executor = tokio::Runner::default();
3761            test_spawn_blocking(executor, dedicated);
3762        }
3763    }
3764
3765    #[test]
3766    #[should_panic(expected = "blocking task panicked")]
3767    fn test_tokio_spawn_blocking_panic() {
3768        for dedicated in [false, true] {
3769            let executor = tokio::Runner::default();
3770            test_spawn_blocking_panic(executor, dedicated);
3771        }
3772    }
3773
3774    #[test]
3775    fn test_tokio_spawn_blocking_panic_caught() {
3776        for dedicated in [false, true] {
3777            let cfg = tokio::Config::default().with_catch_panics(true);
3778            let executor = tokio::Runner::new(cfg);
3779            test_spawn_blocking_panic_caught(executor, dedicated);
3780        }
3781    }
3782
3783    #[test]
3784    fn test_tokio_spawn_blocking_abort() {
3785        for (dedicated, blocking) in [(false, true), (true, false)] {
3786            let executor = tokio::Runner::default();
3787            test_spawn_abort(executor, dedicated, blocking);
3788        }
3789    }
3790
3791    #[test]
3792    fn test_tokio_circular_reference_prevents_cleanup() {
3793        let executor = tokio::Runner::default();
3794        test_circular_reference_prevents_cleanup(executor);
3795    }
3796
3797    #[test]
3798    fn test_tokio_late_waker() {
3799        let executor = tokio::Runner::default();
3800        test_late_waker(executor);
3801    }
3802
3803    #[test]
3804    fn test_tokio_metrics() {
3805        let executor = tokio::Runner::default();
3806        test_metrics(executor);
3807    }
3808
3809    #[test]
3810    fn test_tokio_process_rss_metric() {
3811        let executor = tokio::Runner::default();
3812        executor.start(|context| async move {
3813            loop {
3814                // Wait for RSS metric to be available
3815                let metrics = context.encode();
3816                if !metrics.contains("runtime_process_rss") {
3817                    context.sleep(Duration::from_millis(100)).await;
3818                    continue;
3819                }
3820
3821                // Verify the RSS value is eventually populated (greater than 0)
3822                for line in metrics.lines() {
3823                    if line.starts_with("runtime_process_rss")
3824                        && !line.starts_with("runtime_process_rss{")
3825                    {
3826                        let parts: Vec<&str> = line.split_whitespace().collect();
3827                        if parts.len() >= 2 {
3828                            let rss_value: i64 =
3829                                parts[1].parse().expect("Failed to parse RSS value");
3830                            if rss_value > 0 {
3831                                return;
3832                            }
3833                        }
3834                    }
3835                }
3836            }
3837        });
3838    }
3839
3840    #[test]
3841    fn test_tokio_telemetry() {
3842        let executor = tokio::Runner::default();
3843        executor.start(|context| async move {
3844            // Define the server address
3845            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3846
3847            // Configure telemetry
3848            tokio::telemetry::init(
3849                context.child("metrics"),
3850                tokio::telemetry::Logging {
3851                    level: Level::INFO,
3852                    json: false,
3853                },
3854                Some(address),
3855                None,
3856            );
3857
3858            // Register a test metric
3859            let counter: Counter<u64> = Counter::default();
3860            let _registered = context.register("test_counter", "Test counter", counter.clone());
3861            counter.inc();
3862
3863            // Helper functions to parse HTTP response
3864            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3865                let mut line = Vec::new();
3866                loop {
3867                    let received = stream.recv(1).await?;
3868                    let byte = received.coalesce().as_ref()[0];
3869                    if byte == b'\n' {
3870                        if line.last() == Some(&b'\r') {
3871                            line.pop(); // Remove trailing \r
3872                        }
3873                        break;
3874                    }
3875                    line.push(byte);
3876                }
3877                String::from_utf8(line).map_err(|_| Error::ReadFailed)
3878            }
3879
3880            async fn read_headers<St: Stream>(
3881                stream: &mut St,
3882            ) -> Result<HashMap<String, String>, Error> {
3883                let mut headers = HashMap::new();
3884                loop {
3885                    let line = read_line(stream).await?;
3886                    if line.is_empty() {
3887                        break;
3888                    }
3889                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
3890                    if parts.len() == 2 {
3891                        headers.insert(parts[0].to_string(), parts[1].to_string());
3892                    }
3893                }
3894                Ok(headers)
3895            }
3896
3897            async fn read_body<St: Stream>(
3898                stream: &mut St,
3899                content_length: usize,
3900            ) -> Result<String, Error> {
3901                let received = stream.recv(content_length).await?;
3902                String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3903            }
3904
3905            // Simulate a client connecting to the server
3906            let client_handle = context.child("client").spawn(move |context| async move {
3907                let (mut sink, mut stream) = loop {
3908                    match context.dial(address).await {
3909                        Ok((sink, stream)) => break (sink, stream),
3910                        Err(e) => {
3911                            // The client may be polled before the server is ready, that's alright!
3912                            error!(err =?e, "failed to connect");
3913                            context.sleep(Duration::from_millis(10)).await;
3914                        }
3915                    }
3916                };
3917
3918                // Send a GET request to the server
3919                let request = format!(
3920                    "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3921                );
3922                sink.send(Bytes::from(request)).await.unwrap();
3923
3924                // Read and verify the HTTP status line
3925                let status_line = read_line(&mut stream).await.unwrap();
3926                assert_eq!(status_line, "HTTP/1.1 200 OK");
3927
3928                // Read and parse headers
3929                let headers = read_headers(&mut stream).await.unwrap();
3930                println!("Headers: {headers:?}");
3931                let content_length = headers
3932                    .get("content-length")
3933                    .unwrap()
3934                    .parse::<usize>()
3935                    .unwrap();
3936
3937                // Read and verify the body
3938                let body = read_body(&mut stream, content_length).await.unwrap();
3939                assert!(body.contains("test_counter_total 1"));
3940            });
3941
3942            // Wait for the client task to complete
3943            client_handle.await.unwrap();
3944        });
3945    }
3946
3947    #[test]
3948    fn test_tokio_resolver() {
3949        let executor = tokio::Runner::default();
3950        executor.start(|context| async move {
3951            let addrs = context.resolve("localhost").await.unwrap();
3952            assert!(!addrs.is_empty());
3953            for addr in addrs {
3954                assert!(
3955                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3956                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3957                );
3958            }
3959        });
3960    }
3961
3962    #[test]
3963    fn test_create_thread_pool_tokio() {
3964        let executor = tokio::Runner::default();
3965        executor.start(|context| async move {
3966            // Create a thread pool with 4 threads
3967            let pool = context
3968                .child("pool")
3969                .create_thread_pool(NZUsize!(4))
3970                .unwrap();
3971
3972            // Create a vector of numbers
3973            let v: Vec<_> = (0..10000).collect();
3974
3975            // Use the thread pool to sum the numbers
3976            pool.install(|| {
3977                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3978            });
3979        });
3980    }
3981
3982    #[test]
3983    fn test_create_thread_pool_deterministic() {
3984        let executor = deterministic::Runner::default();
3985        executor.start(|context| async move {
3986            // Create a thread pool with 4 threads
3987            let pool = context
3988                .child("pool")
3989                .create_thread_pool(NZUsize!(4))
3990                .unwrap();
3991
3992            // Create a vector of numbers
3993            let v: Vec<_> = (0..10000).collect();
3994
3995            // Use the thread pool to sum the numbers
3996            pool.install(|| {
3997                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3998            });
3999        });
4000    }
4001
4002    fn test_buffer_pooler<R: Runner>(
4003        runner: R,
4004        expected_network_max_per_class: u32,
4005        expected_storage_max_per_class: u32,
4006    ) where
4007        R::Context: BufferPooler,
4008    {
4009        runner.start(|context| async move {
4010            // Verify network pool is accessible and works (cache-line aligned)
4011            let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
4012            assert!(net_buf.capacity() >= 1024);
4013
4014            // Verify storage pool is accessible and works (page-aligned)
4015            let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
4016            assert!(storage_buf.capacity() >= 4096);
4017
4018            // Verify pools have expected configurations
4019            assert_eq!(
4020                context.network_buffer_pool().config().max_per_class.get(),
4021                expected_network_max_per_class
4022            );
4023            assert_eq!(
4024                context.storage_buffer_pool().config().max_per_class.get(),
4025                expected_storage_max_per_class
4026            );
4027        });
4028    }
4029
4030    #[test]
4031    fn test_deterministic_buffer_pooler() {
4032        test_buffer_pooler(deterministic::Runner::default(), 4096, 64);
4033
4034        let runner = deterministic::Runner::new(
4035            deterministic::Config::default()
4036                .with_network_buffer_pool_config(
4037                    BufferPoolConfig::for_network().with_max_per_class(NZU32!(64)),
4038                )
4039                .with_storage_buffer_pool_config(
4040                    BufferPoolConfig::for_storage().with_max_per_class(NZU32!(8)),
4041                ),
4042        );
4043        test_buffer_pooler(runner, 64, 8);
4044    }
4045
4046    #[test]
4047    fn test_tokio_buffer_pooler() {
4048        test_buffer_pooler(tokio::Runner::default(), 4096, 64);
4049
4050        let runner = tokio::Runner::new(
4051            tokio::Config::default()
4052                .with_network_buffer_pool_config(
4053                    BufferPoolConfig::for_network().with_max_per_class(NZU32!(64)),
4054                )
4055                .with_storage_buffer_pool_config(
4056                    BufferPoolConfig::for_storage().with_max_per_class(NZU32!(8)),
4057                ),
4058        );
4059        test_buffer_pooler(runner, 64, 8);
4060    }
4061}