Skip to main content

commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! Stability varies by primitive. See [README](https://github.com/commonwarexyz/monorepo#stability) for details.
18
19#![doc(
20    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21    html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34    pub mod deterministic;
35    pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38    pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41    mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44    pub mod tokio;
45});
46stability_scope!(BETA {
47    use commonware_macros::select;
48    use commonware_parallel::{Rayon, ThreadPool};
49    use iobuf::PoolError;
50    use prometheus_client::registry::Metric;
51    use rayon::ThreadPoolBuildError;
52    use std::{
53        future::Future,
54        io::Error as IoError,
55        net::SocketAddr,
56        num::NonZeroUsize,
57        time::{Duration, SystemTime},
58    };
59    use thiserror::Error;
60
61    /// Prefix for runtime metrics.
62    pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64    /// Re-export of `Buf` and `BufMut` traits for usage with [I/O buffers](iobuf).
65    pub use bytes::{Buf, BufMut};
66    /// Re-export of [governor::Quota] for rate limiting configuration.
67    pub use governor::Quota;
68
69    pub mod iobuf;
70    pub use iobuf::{
71        BufferPool, BufferPoolConfig, BufferPoolThreadCache, Builder as IoBufsBuilder, IoBuf,
72        IoBufMut, IoBufs, IoBufsMut,
73    };
74
75    pub mod utils;
76    pub use utils::*;
77
78    pub mod telemetry;
79
80    /// Default [`Blob`] version used when no version is specified via [`Storage::open`].
81    pub const DEFAULT_BLOB_VERSION: u16 = 0;
82
83    /// Errors that can occur when interacting with the runtime.
84    #[derive(Error, Debug)]
85    pub enum Error {
86        #[error("exited")]
87        Exited,
88        #[error("closed")]
89        Closed,
90        #[error("timeout")]
91        Timeout,
92        #[error("bind failed")]
93        BindFailed,
94        #[error("connection failed")]
95        ConnectionFailed,
96        #[error("write failed")]
97        WriteFailed,
98        #[error("read failed")]
99        ReadFailed,
100        #[error("send failed")]
101        SendFailed,
102        #[error("recv failed")]
103        RecvFailed,
104        #[error("dns resolution failed: {0}")]
105        ResolveFailed(String),
106        #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
107        PartitionNameInvalid(String),
108        #[error("partition creation failed: {0}")]
109        PartitionCreationFailed(String),
110        #[error("partition missing: {0}")]
111        PartitionMissing(String),
112        #[error("partition corrupt: {0}")]
113        PartitionCorrupt(String),
114        #[error("blob open failed: {0}/{1} error: {2}")]
115        BlobOpenFailed(String, String, IoError),
116        #[error("blob missing: {0}/{1}")]
117        BlobMissing(String, String),
118        #[error("blob resize failed: {0}/{1} error: {2}")]
119        BlobResizeFailed(String, String, IoError),
120        #[error("blob sync failed: {0}/{1} error: {2}")]
121        BlobSyncFailed(String, String, IoError),
122        #[error("blob insufficient length")]
123        BlobInsufficientLength,
124        #[error("blob corrupt: {0}/{1} reason: {2}")]
125        BlobCorrupt(String, String, String),
126        #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
127        BlobVersionMismatch {
128            expected: std::ops::RangeInclusive<u16>,
129            found: u16,
130        },
131        #[error("invalid or missing checksum")]
132        InvalidChecksum,
133        #[error("offset overflow")]
134        OffsetOverflow,
135        #[error("io error: {0}")]
136        Io(#[from] IoError),
137        #[error("buffer pool: {0}")]
138        Pool(#[from] PoolError),
139    }
140
141    /// Interface that any task scheduler must implement to start
142    /// running tasks.
143    pub trait Runner {
144        /// Context defines the environment available to tasks.
145        /// Example of possible services provided by the context include:
146        /// - [Clock] for time-based operations
147        /// - [Network] for network operations
148        /// - [Storage] for storage operations
149        type Context;
150
151        /// Start running a root task.
152        ///
153        /// When this function returns, all spawned tasks will be canceled. If clean
154        /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
155        /// [Spawner::stopped] to coordinate clean shutdown.
156        fn start<F, Fut>(self, f: F) -> Fut::Output
157        where
158            F: FnOnce(Self::Context) -> Fut,
159            Fut: Future;
160    }
161
162    /// Interface that any task scheduler must implement to spawn tasks.
163    pub trait Spawner: Clone + Send + Sync + 'static {
164        /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
165        ///
166        /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
167        /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
168        /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
169        /// [`Spawner::dedicated`] instead.
170        ///
171        /// The shared executor with `blocking == false` is the default spawn mode.
172        fn shared(self, blocking: bool) -> Self;
173
174        /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
175        ///
176        /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
177        /// shared executor.
178        ///
179        /// This is not the default behavior. See [`Spawner::shared`] for more information.
180        fn dedicated(self) -> Self;
181
182        /// Spawn a task with the current context.
183        ///
184        /// Unlike directly awaiting a future, the task starts running immediately even if the caller
185        /// never awaits the returned [`Handle`].
186        ///
187        /// # Mandatory Supervision
188        ///
189        /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
190        ///
191        /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
192        /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
193        ///
194        /// ```txt
195        /// ctx_a
196        ///   |
197        ///   +-- clone() ---> ctx_c
198        ///   |                  |
199        ///   |                  +-- spawn() ---> Task C (ctx_d)
200        ///   |
201        ///   +-- spawn() ---> Task A (ctx_b)
202        ///                              |
203        ///                              +-- spawn() ---> Task B (ctx_e)
204        ///
205        /// Task A finishes or aborts --> Task B and Task C are aborted
206        /// ```
207        ///
208        /// # Spawn Configuration
209        ///
210        /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
211        /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
212        ///
213        /// Child tasks should assume they start from a clean configuration without needing to inspect how their
214        /// parent was configured.
215        fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
216        where
217            F: FnOnce(Self) -> Fut + Send + 'static,
218            Fut: Future<Output = T> + Send + 'static,
219            T: Send + 'static;
220
221        /// Signals the runtime to stop execution and waits for all outstanding tasks
222        /// to perform any required cleanup and exit.
223        ///
224        /// This method does not actually kill any tasks but rather signals to them, using
225        /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
226        /// It then waits for all [signal::Signal] references to be dropped before returning.
227        ///
228        /// ## Multiple Stop Calls
229        ///
230        /// This method is idempotent and safe to call multiple times concurrently (on
231        /// different instances of the same context since it consumes `self`). The first
232        /// call initiates shutdown with the provided `value`, and all subsequent calls
233        /// will wait for the same completion regardless of their `value` parameter, i.e.
234        /// the original `value` from the first call is preserved.
235        ///
236        /// ## Timeout
237        ///
238        /// If a timeout is provided, the method will return an error if all [signal::Signal]
239        /// references have not been dropped within the specified duration.
240        fn stop(
241            self,
242            value: i32,
243            timeout: Option<Duration>,
244        ) -> impl Future<Output = Result<(), Error>> + Send;
245
246        /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
247        /// any task.
248        ///
249        /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
250        /// immediately. The [signal::Signal] returned will always resolve to the value of the
251        /// first [Spawner::stop] call.
252        fn stopped(&self) -> signal::Signal;
253    }
254
255    /// Trait for creating [rayon]-compatible thread pools with each worker thread
256    /// placed on dedicated threads via [Spawner].
257    pub trait ThreadPooler: Spawner + Metrics {
258        /// Creates a clone-able [rayon]-compatible thread pool with [Spawner::spawn].
259        ///
260        /// # Arguments
261        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
262        ///
263        /// # Returns
264        /// A `Result` containing the configured [rayon::ThreadPool] or a [rayon::ThreadPoolBuildError] if the pool cannot
265        /// be built.
266        fn create_thread_pool(
267            &self,
268            concurrency: NonZeroUsize,
269        ) -> Result<ThreadPool, ThreadPoolBuildError>;
270
271        /// Creates a clone-able [Rayon] strategy for use with [commonware_parallel].
272        ///
273        /// # Arguments
274        /// - `concurrency`: The number of tasks to execute concurrently in the pool.
275        ///
276        /// # Returns
277        /// A `Result` containing the configured [Rayon] strategy or a [rayon::ThreadPoolBuildError] if the pool cannot be
278        /// built.
279        fn create_strategy(
280            &self,
281            concurrency: NonZeroUsize,
282        ) -> Result<Rayon, ThreadPoolBuildError> {
283            self.create_thread_pool(concurrency).map(Rayon::with_pool)
284        }
285    }
286
287    /// Interface to register and encode metrics.
288    ///
289    /// # Mental Model: Metrics vs Tracing
290    ///
291    /// A context carries multiple pieces of observability state, each
292    /// applied via a different builder. They compose freely, but they do
293    /// not all feed into the same sinks:
294    ///
295    /// - `name` (set by [`Metrics::with_label`]): prefix applied to metrics
296    ///   you [`register`](Metrics::register) on this context; also populates
297    ///   the `name` field of runtime-internal task metrics
298    ///   (`runtime_tasks_spawned`, `runtime_tasks_running`).
299    /// - `attributes` (set by [`Metrics::with_attribute`]): Prometheus label
300    ///   dimensions on metrics you `register` on this context; also emitted
301    ///   as OpenTelemetry attributes on the per-task tracing span when, and
302    ///   only when, [`Metrics::with_span`] is set on the spawn. Runtime
303    ///   task metrics intentionally ignore attributes to keep their
304    ///   cardinality bounded.
305    /// - `scope` (set by [`Metrics::with_scope`]): routes metrics you
306    ///   `register` on this context into a sub-registry whose entries are
307    ///   removed when the last clone of the scoped context is dropped. It
308    ///   does not affect runtime task metrics and has no effect on tracing.
309    /// - `span` (set by [`Metrics::with_span`]): wraps the next spawned task
310    ///   in a `tracing` span populated from the current `name` and `attributes`.
311    ///   It never touches metrics. The flag is consumed by the next `spawn` and
312    ///   is not inherited by child contexts.
313    ///
314    /// | Builder                 | Registered metric name | Registered metric labels | Registry isolation | Runtime task metrics | Tracing span                     |
315    /// | ----------------------- | :--------------------: | :----------------------: | :----------------: | :------------------: | :------------------------------: |
316    /// | `with_label`            | prefix                 | -                        | -                  | `name`               | `name` field when `with_span`    |
317    /// | `with_attribute`        | -                      | label dimension          | -                  | -                    | OTel attribute when `with_span`  |
318    /// | `with_scope`            | -                      | -                        | sub-registry       | -                    | -                                |
319    /// | `with_span`             | -                      | -                        | -                  | -                    | enables span creation            |
320    pub trait Metrics: Clone + Send + Sync + 'static {
321        /// Get the current label of the context.
322        fn label(&self) -> String;
323
324        /// Create a new instance of `Metrics` with the given label appended to the end
325        /// of the current `Metrics` label.
326        ///
327        /// This is commonly used to create a nested context for `register`.
328        ///
329        /// Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. It is not permitted for
330        /// any implementation to use `METRICS_PREFIX` as the start of a label (reserved for metrics for the runtime).
331        fn with_label(&self, label: &str) -> Self;
332
333        /// Create a new instance of `Metrics` with an additional attribute (key-value pair) applied
334        /// to all metrics registered in this context and any child contexts.
335        ///
336        /// Unlike [`Metrics::with_label`] which affects the metric name prefix, `with_attribute` adds
337        /// a key-value pair that appears as a separate dimension in the metric output. This is
338        /// useful for instrumenting n-ary data structures in a way that is easy to manage downstream.
339        ///
340        /// Keys must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`. Values can be any string.
341        ///
342        /// # Labeling Children
343        ///
344        /// Attributes apply to the entire subtree of contexts. When you call `with_attribute`, the
345        /// label is automatically added to all metrics registered in that context and any child
346        /// contexts created via `with_label`:
347        ///
348        /// ```text
349        /// context
350        ///   |-- with_label("orchestrator")
351        ///         |-- with_attribute("epoch", "5")
352        ///               |-- counter: votes        -> orchestrator_votes{epoch="5"}
353        ///               |-- counter: proposals    -> orchestrator_proposals{epoch="5"}
354        ///               |-- with_label("engine")
355        ///                     |-- gauge: height   -> orchestrator_engine_height{epoch="5"}
356        /// ```
357        ///
358        /// This pattern avoids wrapping every metric in a `Family` and avoids polluting metric
359        /// names with dynamic values like `orchestrator_epoch_5_votes`.
360        ///
361        /// _Using attributes does not reduce cardinality (N epochs still means N time series).
362        /// Attributes just make metrics easier to query, filter, and aggregate._
363        ///
364        /// # Family Label Conflicts
365        ///
366        /// When using `Family` metrics, avoid using attribute keys that match the Family's label field names.
367        /// If a conflict occurs, the encoded output will contain duplicate labels (e.g., `{env="prod",env="staging"}`),
368        /// which is invalid Prometheus format and may cause scraping issues.
369        ///
370        /// ```ignore
371        /// #[derive(EncodeLabelSet)]
372        /// struct Labels { env: String }
373        ///
374        /// // BAD: attribute "env" conflicts with Family field "env"
375        /// let ctx = context.with_attribute("env", "prod");
376        /// let family: Family<Labels, Counter> = Family::default();
377        /// ctx.register("requests", "help", family);
378        /// // Produces invalid: requests_total{env="prod",env="staging"}
379        ///
380        /// // GOOD: use distinct names
381        /// let ctx = context.with_attribute("region", "us_east");
382        /// // Produces valid: requests_total{region="us_east",env="staging"}
383        /// ```
384        ///
385        /// # Example
386        ///
387        /// ```ignore
388        /// // Instead of creating epoch-specific metric names:
389        /// let ctx = context.with_label(&format!("consensus_engine_{}", epoch));
390        /// // Produces: consensus_engine_5_votes_total, consensus_engine_6_votes_total, ...
391        ///
392        /// // Use attributes to add epoch as a label dimension:
393        /// let ctx = context.with_label("consensus_engine").with_attribute("epoch", epoch);
394        /// // Produces: consensus_engine_votes_total{epoch="5"}, consensus_engine_votes_total{epoch="6"}, ...
395        /// ```
396        ///
397        /// Multiple attributes can be chained:
398        /// ```ignore
399        /// let ctx = context
400        ///     .with_label("engine")
401        ///     .with_attribute("region", "us_east")
402        ///     .with_attribute("instance", "i1");
403        /// // Produces: engine_requests_total{region="us_east",instance="i1"} 42
404        /// ```
405        ///
406        /// # Querying The Latest Attribute
407        ///
408        /// To query the latest attribute value dynamically, create a gauge to track the current value:
409        /// ```ignore
410        /// // Create a gauge to track the current epoch
411        /// let latest_epoch = Gauge::<i64>::default();
412        /// context.with_label("orchestrator").register("latest_epoch", "current epoch", latest_epoch.clone());
413        /// latest_epoch.set(current_epoch);
414        /// // Produces: orchestrator_latest_epoch 5
415        /// ```
416        ///
417        /// Then create a dashboard variable `$latest_epoch` with query `max(orchestrator_latest_epoch)`
418        /// and use it in panel queries: `consensus_engine_votes_total{epoch="$latest_epoch"}`
419        fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
420
421        /// Create a scoped context for metrics with a bounded lifetime (e.g., per-epoch
422        /// consensus engines). All metrics registered through the returned context (and
423        /// child contexts via [`Metrics::with_label`]/[`Metrics::with_attribute`]) go into
424        /// a separate registry that is automatically removed when all clones of the scoped
425        /// context are dropped.
426        ///
427        /// If the context is already scoped, returns a clone with the same scope (scopes
428        /// nest by inheritance, not by creating new independent scopes).
429        ///
430        /// # Uniqueness
431        ///
432        /// Scoped metrics share the same global uniqueness constraint as
433        /// [`Metrics::register`]: each (prefixed_name, attributes) pair must be unique.
434        /// Callers should use [`Metrics::with_attribute`] to distinguish metrics across
435        /// scopes (e.g., an "epoch" attribute) rather than re-registering identical keys.
436        ///
437        /// # Example
438        ///
439        /// ```ignore
440        /// let scoped = context
441        ///     .with_label("engine")
442        ///     .with_attribute("epoch", epoch)
443        ///     .with_scope();
444        ///
445        /// // Register metrics into the scoped registry
446        /// let counter = Counter::default();
447        /// scoped.register("votes", "vote count", counter.clone());
448        ///
449        /// // Metrics are removed when all clones of `scoped` are dropped.
450        /// ```
451        fn with_scope(&self) -> Self;
452
453        /// Return a new context that wraps the next task spawned from it in a `tracing`
454        /// span whose `name` field and OpenTelemetry attributes are derived from the
455        /// context's label and attributes.
456        ///
457        /// The flag is consumed by the next [`Spawner::spawn`] call on the returned
458        /// context (and on any clone of it). It is not inherited by child contexts
459        /// produced via [`Metrics::with_label`], [`Metrics::with_attribute`],
460        /// [`Metrics::with_scope`], or any of the `Spawner` builders: each such builder
461        /// resets the flag so the caller must opt in again for the next spawn.
462        ///
463        /// Enabling the span only affects tracing; it does not change which metrics
464        /// are registered, nor does it widen the cardinality of runtime task metrics.
465        fn with_span(&self) -> Self;
466
467        /// Register a metric with the runtime.
468        ///
469        /// Any registered metric will include (as a prefix) the label of the current context.
470        ///
471        /// Names must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`.
472        fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
473
474        /// Encode all metrics into a buffer.
475        ///
476        /// To ensure downstream analytics tools work correctly, users must never duplicate metrics
477        /// (via the concatenation of nested `with_label` and `register` calls). This can be
478        /// avoided by using `with_label` and `with_attribute` to create new context instances
479        /// (ensures all context instances are namespaced).
480        fn encode(&self) -> String;
481    }
482
483    /// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
484    ///
485    /// This is a convenience type alias for creating single-entity rate limiters.
486    /// For per-key rate limiting, use [KeyedRateLimiter].
487    pub type RateLimiter<C> = governor::RateLimiter<
488        governor::state::NotKeyed,
489        governor::state::InMemoryState,
490        C,
491        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
492    >;
493
494    /// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
495    ///
496    /// This is a convenience type alias for creating per-peer rate limiters
497    /// using governor's [HashMapStateStore].
498    ///
499    /// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
500    pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
501        K,
502        governor::state::keyed::HashMapStateStore<K>,
503        C,
504        governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
505    >;
506
507    /// Interface that any task scheduler must implement to provide
508    /// time-based operations.
509    ///
510    /// It is necessary to mock time to provide deterministic execution
511    /// of arbitrary tasks.
512    pub trait Clock:
513        governor::clock::Clock<Instant = SystemTime>
514        + governor::clock::ReasonablyRealtime
515        + Clone
516        + Send
517        + Sync
518        + 'static
519    {
520        /// Returns the current time.
521        fn current(&self) -> SystemTime;
522
523        /// Sleep for the given duration.
524        fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
525
526        /// Sleep until the given deadline.
527        fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
528
529        /// Await a future with a timeout, returning `Error::Timeout` if it expires.
530        ///
531        /// # Examples
532        ///
533        /// ```
534        /// use std::time::Duration;
535        /// use commonware_runtime::{deterministic, Error, Runner, Clock};
536        ///
537        /// let executor = deterministic::Runner::default();
538        /// executor.start(|context| async move {
539        ///     match context
540        ///         .timeout(Duration::from_millis(100), async { 42 })
541        ///         .await
542        ///     {
543        ///         Ok(value) => assert_eq!(value, 42),
544        ///         Err(Error::Timeout) => panic!("should not timeout"),
545        ///         Err(e) => panic!("unexpected error: {:?}", e),
546        ///     }
547        /// });
548        /// ```
549        fn timeout<F, T>(
550            &self,
551            duration: Duration,
552            future: F,
553        ) -> impl Future<Output = Result<T, Error>> + Send + '_
554        where
555            F: Future<Output = T> + Send + 'static,
556            T: Send + 'static,
557        {
558            async move {
559                select! {
560                    result = future => Ok(result),
561                    _ = self.sleep(duration) => Err(Error::Timeout),
562                }
563            }
564        }
565    }
566
567    /// Syntactic sugar for the type of [Sink] used by a given [Network] N.
568    pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
569
570    /// Syntactic sugar for the type of [Stream] used by a given [Network] N.
571    pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
572
573    /// Syntactic sugar for the type of [Listener] used by a given [Network] N.
574    pub type ListenerOf<N> = <N as crate::Network>::Listener;
575
576    /// Interface that any runtime must implement to create
577    /// network connections.
578    pub trait Network: Clone + Send + Sync + 'static {
579        /// The type of [Listener] that's returned when binding to a socket.
580        /// Accepting a connection returns a [Sink] and [Stream] which are defined
581        /// by the [Listener] and used to send and receive data over the connection.
582        type Listener: Listener;
583
584        /// Bind to the given socket address.
585        fn bind(
586            &self,
587            socket: SocketAddr,
588        ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
589
590        /// Dial the given socket address.
591        fn dial(
592            &self,
593            socket: SocketAddr,
594        ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
595    }
596
597    /// Interface for DNS resolution.
598    pub trait Resolver: Clone + Send + Sync + 'static {
599        /// Resolve a hostname to IP addresses.
600        ///
601        /// Returns a list of IP addresses that the hostname resolves to.
602        fn resolve(
603            &self,
604            host: &str,
605        ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
606    }
607
608    /// Interface that any runtime must implement to handle
609    /// incoming network connections.
610    pub trait Listener: Sync + Send + 'static {
611        /// The type of [Sink] that's returned when accepting a connection.
612        /// This is used to send data to the remote connection.
613        type Sink: Sink;
614        /// The type of [Stream] that's returned when accepting a connection.
615        /// This is used to receive data from the remote connection.
616        type Stream: Stream;
617
618        /// Accept an incoming connection.
619        fn accept(
620            &mut self,
621        ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
622
623        /// Returns the local address of the listener.
624        fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
625    }
626
627    /// Interface that any runtime must implement to send
628    /// messages over a network connection.
629    pub trait Sink: Sync + Send + 'static {
630        /// Send a message to the sink.
631        ///
632        /// # Warning
633        ///
634        /// If the sink returns an error, part of the message may still be delivered.
635        fn send(
636            &mut self,
637            bufs: impl Into<IoBufs> + Send,
638        ) -> impl Future<Output = Result<(), Error>> + Send;
639    }
640
641    /// Interface that any runtime must implement to receive
642    /// messages over a network connection.
643    pub trait Stream: Sync + Send + 'static {
644        /// Receive exactly `len` bytes from the stream.
645        ///
646        /// The runtime allocates the buffer and returns it as `IoBufs`.
647        ///
648        /// # Warning
649        ///
650        /// If the stream returns an error, partially read data may be discarded.
651        fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
652
653        /// Peek at buffered data without consuming.
654        ///
655        /// Returns up to `max_len` bytes from the internal buffer, or an empty slice
656        /// if no data is currently buffered. This does not perform any I/O or block.
657        ///
658        /// This is useful e.g. for parsing length prefixes without committing to a read
659        /// or paying the cost of async.
660        fn peek(&self, max_len: usize) -> &[u8];
661    }
662
663    /// Interface to interact with storage.
664    ///
665    /// To support storage implementations that enable concurrent reads and
666    /// writes, blobs are responsible for maintaining synchronization.
667    ///
668    /// Storage can be backed by a local filesystem, cloud storage, etc.
669    ///
670    /// # Partition Names
671    ///
672    /// Partition names must be non-empty and contain only ASCII alphanumeric
673    /// characters, dashes (`-`), or underscores (`_`). Names containing other
674    /// characters (e.g., `/`, `.`, spaces) will return an error.
675    pub trait Storage: Clone + Send + Sync + 'static {
676        /// The readable/writeable storage buffer that can be opened by this Storage.
677        type Blob: Blob;
678
679        /// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
680        /// in the versions range. The blob version is omitted from the return value.
681        fn open(
682            &self,
683            partition: &str,
684            name: &[u8],
685        ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
686            async move {
687                let (blob, size, _) = self
688                    .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
689                    .await?;
690                Ok((blob, size))
691            }
692        }
693
694        /// Open an existing blob in a given partition or create a new one, returning
695        /// the blob and its length.
696        ///
697        /// Multiple instances of the same blob can be opened concurrently, however,
698        /// writing to the same blob concurrently may lead to undefined behavior.
699        ///
700        /// An Ok result indicates the blob is durably created (or already exists).
701        ///
702        /// # Versions
703        ///
704        /// Blobs are versioned. If the blob's version is not in `versions`, returns
705        /// [Error::BlobVersionMismatch].
706        ///
707        /// # Returns
708        ///
709        /// A tuple of (blob, logical_size, blob_version).
710        fn open_versioned(
711            &self,
712            partition: &str,
713            name: &[u8],
714            versions: std::ops::RangeInclusive<u16>,
715        ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
716
717        /// Remove a blob from a given partition.
718        ///
719        /// If no `name` is provided, the entire partition is removed.
720        ///
721        /// An Ok result indicates the blob is durably removed.
722        fn remove(
723            &self,
724            partition: &str,
725            name: Option<&[u8]>,
726        ) -> impl Future<Output = Result<(), Error>> + Send;
727
728        /// Return all blobs in a given partition.
729        fn scan(&self, partition: &str)
730            -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
731    }
732
733    /// Interface to read and write to a blob.
734    ///
735    /// To support blob implementations that enable concurrent reads and
736    /// writes, blobs are responsible for maintaining synchronization.
737    ///
738    /// Cloning a blob is similar to wrapping a single file descriptor in
739    /// a lock whereas opening a new blob (of the same name) is similar to
740    /// opening a new file descriptor. If multiple blobs are opened with the same
741    /// name, they are not expected to coordinate access to underlying storage
742    /// and writing to both is undefined behavior.
743    ///
744    /// When a blob is dropped, any unsynced changes may be discarded. Implementations
745    /// may attempt to sync during drop but errors will go unhandled. Call `sync`
746    /// before dropping to ensure all changes are durably persisted.
747    #[allow(clippy::len_without_is_empty)]
748    pub trait Blob: Clone + Send + Sync + 'static {
749        /// Read `len` bytes at `offset` into caller-provided buffer(s).
750        ///
751        /// The caller provides the buffer(s), and the implementation fills it with
752        /// exactly `len` bytes of data read from the blob starting at `offset`.
753        /// Returns the same buffer(s), filled with data.
754        ///
755        /// # Contract
756        ///
757        /// - The returned buffers reuse caller-provided storage, with exactly `len`
758        ///   bytes filled from `offset`.
759        /// - Caller-provided chunk layout is preserved.
760        ///
761        /// # Panics
762        ///
763        /// Panics if `len` exceeds the total capacity of `bufs`.
764        fn read_at_buf(
765            &self,
766            offset: u64,
767            len: usize,
768            bufs: impl Into<IoBufsMut> + Send,
769        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
770
771        /// Read `len` bytes at `offset`, returning a buffer(s) with exactly `len` bytes
772        /// of data read from the blob starting at `offset`.
773        ///
774        /// To reuse a buffer(s), use [`Blob::read_at_buf`].
775        fn read_at(
776            &self,
777            offset: u64,
778            len: usize,
779        ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
780
781        /// Write `bufs` to the blob at the given offset.
782        fn write_at(
783            &self,
784            offset: u64,
785            bufs: impl Into<IoBufs> + Send,
786        ) -> impl Future<Output = Result<(), Error>> + Send;
787
788        /// Resize the blob to the given length.
789        ///
790        /// If the length is greater than the current length, the blob is extended with zeros.
791        /// If the length is less than the current length, the blob is resized.
792        fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
793
794        /// Ensure all pending data is durably persisted.
795        fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
796    }
797
798    /// Interface that any runtime must implement to provide buffer pools.
799    pub trait BufferPooler: Clone + Send + Sync + 'static {
800        /// Returns the network [BufferPool].
801        fn network_buffer_pool(&self) -> &BufferPool;
802
803        /// Returns the storage [BufferPool].
804        fn storage_buffer_pool(&self) -> &BufferPool;
805    }
806});
807stability_scope!(BETA, cfg(feature = "external") {
808    /// Interface that runtimes can implement to constrain the execution latency of a future.
809    pub trait Pacer: Clock + Clone + Send + Sync + 'static {
810        /// Defer completion of a future until a specified `latency` has elapsed. If the future is
811        /// not yet ready at the desired time of completion, the runtime will block until the future
812        /// is ready.
813        ///
814        /// In [crate::deterministic], this is used to ensure interactions with external systems can
815        /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
816        /// multiple runtimes to be tested with no code changes).
817        ///
818        /// # Setting Latency
819        ///
820        /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
821        /// the expected resolution latency of the future. To better explore the possible behavior of an
822        /// application, users can set latency to a randomly chosen value in the range of
823        /// `[expected latency / 2, expected latency * 2]`.
824        ///
825        /// # Warning
826        ///
827        /// Because `pace` blocks if the future is not ready, it is important that the future's completion
828        /// doesn't require anything in the current thread to complete (or else it will deadlock).
829        fn pace<'a, F, T>(
830            &'a self,
831            latency: Duration,
832            future: F,
833        ) -> impl Future<Output = T> + Send + 'a
834        where
835            F: Future<Output = T> + Send + 'a,
836            T: Send + 'a;
837    }
838
839    /// Extension trait that makes it more ergonomic to use [Pacer].
840    ///
841    /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
842    /// runtime should delay completion relative to the clock.
843    pub trait FutureExt: Future + Send + Sized {
844        /// Delay completion of the future until a specified `latency` on `pacer`.
845        fn pace<'a, E>(
846            self,
847            pacer: &'a E,
848            latency: Duration,
849        ) -> impl Future<Output = Self::Output> + Send + 'a
850        where
851            E: Pacer + 'a,
852            Self: Send + 'a,
853            Self::Output: Send + 'a,
854        {
855            pacer.pace(latency, self)
856        }
857    }
858
859    impl<F> FutureExt for F where F: Future + Send {}
860});
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865    use crate::telemetry::traces::collector::TraceStorage;
866    use bytes::Bytes;
867    use commonware_macros::{select, test_collect_traces};
868    use commonware_utils::{
869        channel::{mpsc, oneshot},
870        sync::Mutex,
871        NZUsize, SystemTimeExt,
872    };
873    use futures::{
874        future::{pending, ready},
875        join, pin_mut, FutureExt,
876    };
877    use prometheus_client::{
878        encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue},
879        metrics::{counter::Counter, family::Family},
880    };
881    use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
882    use std::{
883        collections::HashMap,
884        net::{IpAddr, Ipv4Addr, Ipv6Addr},
885        pin::Pin,
886        str::FromStr,
887        sync::{
888            atomic::{AtomicU32, Ordering},
889            Arc,
890        },
891        task::{Context as TContext, Poll, Waker},
892    };
893    use tracing::{error, Level};
894    use utils::reschedule;
895
896    fn test_error_future<R: Runner>(runner: R) {
897        #[allow(clippy::unused_async)]
898        async fn error_future() -> Result<&'static str, &'static str> {
899            Err("An error occurred")
900        }
901        let result = runner.start(|_| error_future());
902        assert_eq!(result, Err("An error occurred"));
903    }
904
905    fn test_clock_sleep<R: Runner>(runner: R)
906    where
907        R::Context: Spawner + Clock,
908    {
909        runner.start(|context| async move {
910            // Capture initial time
911            let start = context.current();
912            let sleep_duration = Duration::from_millis(10);
913            context.sleep(sleep_duration).await;
914
915            // After run, time should have advanced
916            let end = context.current();
917            assert!(end.duration_since(start).unwrap() >= sleep_duration);
918        });
919    }
920
921    fn test_clock_sleep_until<R: Runner>(runner: R)
922    where
923        R::Context: Spawner + Clock + Metrics,
924    {
925        runner.start(|context| async move {
926            // Trigger sleep
927            let now = context.current();
928            context.sleep_until(now + Duration::from_millis(100)).await;
929
930            // Ensure slept duration has elapsed
931            let elapsed = now.elapsed().unwrap();
932            assert!(elapsed >= Duration::from_millis(100));
933        });
934    }
935
936    fn test_clock_sleep_until_far_future<R: Runner>(runner: R)
937    where
938        R::Context: Spawner + Clock,
939    {
940        runner.start(|context| async move {
941            let sleep = context.sleep_until(SystemTime::limit());
942            let result = context.timeout(Duration::from_millis(1), sleep).await;
943            assert!(matches!(result, Err(Error::Timeout)));
944        });
945    }
946
947    fn test_clock_timeout<R: Runner>(runner: R)
948    where
949        R::Context: Spawner + Clock,
950    {
951        runner.start(|context| async move {
952            // Future completes before timeout
953            let result = context
954                .timeout(Duration::from_millis(100), async { "success" })
955                .await;
956            assert_eq!(result.unwrap(), "success");
957
958            // Future exceeds timeout duration
959            let result = context
960                .timeout(Duration::from_millis(50), pending::<()>())
961                .await;
962            assert!(matches!(result, Err(Error::Timeout)));
963
964            // Future completes within timeout
965            let result = context
966                .timeout(
967                    Duration::from_millis(100),
968                    context.sleep(Duration::from_millis(50)),
969                )
970                .await;
971            assert!(result.is_ok());
972        });
973    }
974
975    fn test_root_finishes<R: Runner>(runner: R)
976    where
977        R::Context: Spawner,
978    {
979        runner.start(|context| async move {
980            context.spawn(|_| async move {
981                loop {
982                    reschedule().await;
983                }
984            });
985        });
986    }
987
988    fn test_spawn_after_abort<R>(runner: R)
989    where
990        R: Runner,
991        R::Context: Spawner + Clone,
992    {
993        runner.start(|context| async move {
994            // Create a child context
995            let child = context.clone();
996
997            // Spawn parent and abort
998            let parent_handle = context.spawn(move |_| async move {
999                pending::<()>().await;
1000            });
1001            parent_handle.abort();
1002
1003            // Spawn child and ensure it aborts
1004            let child_handle = child.spawn(move |_| async move {
1005                pending::<()>().await;
1006            });
1007            assert!(matches!(child_handle.await, Err(Error::Closed)));
1008        });
1009    }
1010
1011    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
1012    where
1013        R::Context: Spawner,
1014    {
1015        runner.start(|context| async move {
1016            let context = if dedicated {
1017                assert!(!blocking);
1018                context.dedicated()
1019            } else {
1020                context.shared(blocking)
1021            };
1022
1023            let handle = context.spawn(|_| async move {
1024                loop {
1025                    reschedule().await;
1026                }
1027            });
1028            handle.abort();
1029            assert!(matches!(handle.await, Err(Error::Closed)));
1030        });
1031    }
1032
1033    fn test_panic_aborts_root<R: Runner>(runner: R) {
1034        let result: Result<(), Error> = runner.start(|_| async move {
1035            panic!("blah");
1036        });
1037        result.unwrap_err();
1038    }
1039
1040    fn test_panic_aborts_spawn<R: Runner>(runner: R)
1041    where
1042        R::Context: Spawner + Clock,
1043    {
1044        runner.start(|context| async move {
1045            context.clone().spawn(|_| async move {
1046                panic!("blah");
1047            });
1048
1049            // Loop until panic
1050            loop {
1051                context.sleep(Duration::from_millis(100)).await;
1052            }
1053        });
1054    }
1055
1056    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1057    where
1058        R::Context: Spawner + Clock,
1059    {
1060        let result: Result<(), Error> = runner.start(|context| async move {
1061            let result = context.clone().spawn(|_| async move {
1062                panic!("blah");
1063            });
1064            result.await
1065        });
1066        assert!(matches!(result, Err(Error::Exited)));
1067    }
1068
1069    fn test_multiple_panics<R: Runner>(runner: R)
1070    where
1071        R::Context: Spawner + Clock,
1072    {
1073        runner.start(|context| async move {
1074            context.clone().spawn(|_| async move {
1075                panic!("boom 1");
1076            });
1077            context.clone().spawn(|_| async move {
1078                panic!("boom 2");
1079            });
1080            context.clone().spawn(|_| async move {
1081                panic!("boom 3");
1082            });
1083
1084            // Loop until panic
1085            loop {
1086                context.sleep(Duration::from_millis(100)).await;
1087            }
1088        });
1089    }
1090
1091    fn test_multiple_panics_caught<R: Runner>(runner: R)
1092    where
1093        R::Context: Spawner + Clock,
1094    {
1095        let (res1, res2, res3) = runner.start(|context| async move {
1096            let handle1 = context.clone().spawn(|_| async move {
1097                panic!("boom 1");
1098            });
1099            let handle2 = context.clone().spawn(|_| async move {
1100                panic!("boom 2");
1101            });
1102            let handle3 = context.clone().spawn(|_| async move {
1103                panic!("boom 3");
1104            });
1105
1106            join!(handle1, handle2, handle3)
1107        });
1108        assert!(matches!(res1, Err(Error::Exited)));
1109        assert!(matches!(res2, Err(Error::Exited)));
1110        assert!(matches!(res3, Err(Error::Exited)));
1111    }
1112
1113    fn test_select<R: Runner>(runner: R) {
1114        runner.start(|_| async move {
1115            // Test first branch
1116            let output = Mutex::new(0);
1117            select! {
1118                v1 = ready(1) => {
1119                    *output.lock() = v1;
1120                },
1121                v2 = ready(2) => {
1122                    *output.lock() = v2;
1123                },
1124            };
1125            assert_eq!(*output.lock(), 1);
1126
1127            // Test second branch
1128            select! {
1129                v1 = std::future::pending::<i32>() => {
1130                    *output.lock() = v1;
1131                },
1132                v2 = ready(2) => {
1133                    *output.lock() = v2;
1134                },
1135            };
1136            assert_eq!(*output.lock(), 2);
1137        });
1138    }
1139
1140    /// Ensure future fusing works as expected.
1141    fn test_select_loop<R: Runner>(runner: R)
1142    where
1143        R::Context: Clock,
1144    {
1145        runner.start(|context| async move {
1146            // Should hit timeout
1147            let (sender, mut receiver) = mpsc::unbounded_channel();
1148            for _ in 0..2 {
1149                select! {
1150                    v = receiver.recv() => {
1151                        panic!("unexpected value: {v:?}");
1152                    },
1153                    _ = context.sleep(Duration::from_millis(100)) => {
1154                        continue;
1155                    },
1156                };
1157            }
1158
1159            // Populate channel
1160            sender.send(0).unwrap();
1161            sender.send(1).unwrap();
1162
1163            // Prefer not reading channel without losing messages
1164            select! {
1165                _ = async {} => {
1166                    // Skip reading from channel even though populated
1167                },
1168                v = receiver.recv() => {
1169                    panic!("unexpected value: {v:?}");
1170                },
1171            };
1172
1173            // Process messages
1174            for i in 0..2 {
1175                select! {
1176                    _ = context.sleep(Duration::from_millis(100)) => {
1177                        panic!("timeout");
1178                    },
1179                    v = receiver.recv() => {
1180                        assert_eq!(v.unwrap(), i);
1181                    },
1182                };
1183            }
1184        });
1185    }
1186
1187    fn test_storage_operations<R: Runner>(runner: R)
1188    where
1189        R::Context: Storage,
1190    {
1191        runner.start(|context| async move {
1192            let partition = "test_partition";
1193            let name = b"test_blob";
1194
1195            // Open a new blob
1196            let (blob, size) = context
1197                .open(partition, name)
1198                .await
1199                .expect("Failed to open blob");
1200            assert_eq!(size, 0, "new blob should have size 0");
1201
1202            // Write data to the blob
1203            let data = b"Hello, Storage!";
1204            blob.write_at(0, data)
1205                .await
1206                .expect("Failed to write to blob");
1207
1208            // Sync the blob
1209            blob.sync().await.expect("Failed to sync blob");
1210
1211            // Read data from the blob
1212            let read = blob
1213                .read_at(0, data.len())
1214                .await
1215                .expect("Failed to read from blob");
1216            assert_eq!(read.coalesce(), data);
1217
1218            // Sync the blob
1219            blob.sync().await.expect("Failed to sync blob");
1220
1221            // Scan blobs in the partition
1222            let blobs = context
1223                .scan(partition)
1224                .await
1225                .expect("Failed to scan partition");
1226            assert!(blobs.contains(&name.to_vec()));
1227
1228            // Reopen the blob
1229            let (blob, len) = context
1230                .open(partition, name)
1231                .await
1232                .expect("Failed to reopen blob");
1233            assert_eq!(len, data.len() as u64);
1234
1235            // Read data part of message back
1236            let read = blob.read_at(7, 7).await.expect("Failed to read data");
1237            assert_eq!(read.coalesce(), b"Storage");
1238
1239            // Sync the blob
1240            blob.sync().await.expect("Failed to sync blob");
1241
1242            // Remove the blob
1243            context
1244                .remove(partition, Some(name))
1245                .await
1246                .expect("Failed to remove blob");
1247
1248            // Ensure the blob is removed
1249            let blobs = context
1250                .scan(partition)
1251                .await
1252                .expect("Failed to scan partition");
1253            assert!(!blobs.contains(&name.to_vec()));
1254
1255            // Remove the partition
1256            context
1257                .remove(partition, None)
1258                .await
1259                .expect("Failed to remove partition");
1260
1261            // Scan the partition
1262            let result = context.scan(partition).await;
1263            assert!(matches!(result, Err(Error::PartitionMissing(_))));
1264        });
1265    }
1266
1267    fn test_blob_read_write<R: Runner>(runner: R)
1268    where
1269        R::Context: Storage,
1270    {
1271        runner.start(|context| async move {
1272            let partition = "test_partition";
1273            let name = b"test_blob_rw";
1274
1275            // Open a new blob
1276            let (blob, _) = context
1277                .open(partition, name)
1278                .await
1279                .expect("Failed to open blob");
1280
1281            // Write data at different offsets
1282            let data1 = b"Hello";
1283            let data2 = b"World";
1284            blob.write_at(0, data1)
1285                .await
1286                .expect("Failed to write data1");
1287            blob.write_at(5, data2)
1288                .await
1289                .expect("Failed to write data2");
1290
1291            // Read data back
1292            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1293            let read = read.coalesce();
1294            assert_eq!(&read.as_ref()[..5], data1);
1295            assert_eq!(&read.as_ref()[5..], data2);
1296
1297            // Read past end of blob
1298            let result = blob.read_at(10, 10).await;
1299            assert!(result.is_err());
1300
1301            // Rewrite data without affecting length
1302            let data3 = b"Store";
1303            blob.write_at(5, data3)
1304                .await
1305                .expect("Failed to write data3");
1306
1307            // Read data back
1308            let read = blob.read_at(0, 10).await.expect("Failed to read data");
1309            let read = read.coalesce();
1310            assert_eq!(&read.as_ref()[..5], data1);
1311            assert_eq!(&read.as_ref()[5..], data3);
1312
1313            // Read past end of blob
1314            let result = blob.read_at(10, 10).await;
1315            assert!(result.is_err());
1316        });
1317    }
1318
1319    fn test_blob_resize<R: Runner>(runner: R)
1320    where
1321        R::Context: Storage,
1322    {
1323        runner.start(|context| async move {
1324            let partition = "test_partition_resize";
1325            let name = b"test_blob_resize";
1326
1327            // Open and write to a new blob
1328            let (blob, _) = context
1329                .open(partition, name)
1330                .await
1331                .expect("Failed to open blob");
1332
1333            let data = b"some data";
1334            blob.write_at(0, data.to_vec())
1335                .await
1336                .expect("Failed to write");
1337            blob.sync().await.expect("Failed to sync after write");
1338
1339            // Re-open and check length
1340            let (blob, len) = context.open(partition, name).await.unwrap();
1341            assert_eq!(len, data.len() as u64);
1342
1343            // Resize to extend the file
1344            let new_len = (data.len() as u64) * 2;
1345            blob.resize(new_len)
1346                .await
1347                .expect("Failed to resize to extend");
1348            blob.sync().await.expect("Failed to sync after resize");
1349
1350            // Re-open and check length again
1351            let (blob, len) = context.open(partition, name).await.unwrap();
1352            assert_eq!(len, new_len);
1353
1354            // Read original data
1355            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1356            assert_eq!(read_buf.coalesce(), data);
1357
1358            // Read extended part (should be zeros)
1359            let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1360            assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1361
1362            // Truncate the blob
1363            blob.resize(data.len() as u64).await.unwrap();
1364            blob.sync().await.unwrap();
1365
1366            // Reopen to check truncation
1367            let (blob, size) = context.open(partition, name).await.unwrap();
1368            assert_eq!(size, data.len() as u64);
1369
1370            // Read truncated data
1371            let read_buf = blob.read_at(0, data.len()).await.unwrap();
1372            assert_eq!(read_buf.coalesce(), data);
1373            blob.sync().await.unwrap();
1374        });
1375    }
1376
1377    fn test_many_partition_read_write<R: Runner>(runner: R)
1378    where
1379        R::Context: Storage,
1380    {
1381        runner.start(|context| async move {
1382            let partitions = ["partition1", "partition2", "partition3"];
1383            let name = b"test_blob_rw";
1384            let data1 = b"Hello";
1385            let data2 = b"World";
1386
1387            for (additional, partition) in partitions.iter().enumerate() {
1388                // Open a new blob
1389                let (blob, _) = context
1390                    .open(partition, name)
1391                    .await
1392                    .expect("Failed to open blob");
1393
1394                // Write data at different offsets
1395                blob.write_at(0, data1)
1396                    .await
1397                    .expect("Failed to write data1");
1398                blob.write_at(5 + additional as u64, data2)
1399                    .await
1400                    .expect("Failed to write data2");
1401
1402                // Sync the blob
1403                blob.sync().await.expect("Failed to sync blob");
1404            }
1405
1406            for (additional, partition) in partitions.iter().enumerate() {
1407                // Open a new blob
1408                let (blob, len) = context
1409                    .open(partition, name)
1410                    .await
1411                    .expect("Failed to open blob");
1412                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1413
1414                // Read data back
1415                let read = blob
1416                    .read_at(0, 10 + additional)
1417                    .await
1418                    .expect("Failed to read data");
1419                let read = read.coalesce();
1420                assert_eq!(&read.as_ref()[..5], b"Hello");
1421                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1422            }
1423        });
1424    }
1425
1426    fn test_blob_read_past_length<R: Runner>(runner: R)
1427    where
1428        R::Context: Storage,
1429    {
1430        runner.start(|context| async move {
1431            let partition = "test_partition";
1432            let name = b"test_blob_rw";
1433
1434            // Open a new blob
1435            let (blob, _) = context
1436                .open(partition, name)
1437                .await
1438                .expect("Failed to open blob");
1439
1440            // Read data past file length (empty file)
1441            let result = blob.read_at(0, 10).await;
1442            assert!(result.is_err());
1443
1444            // Write data to the blob
1445            let data = b"Hello, Storage!".to_vec();
1446            blob.write_at(0, data)
1447                .await
1448                .expect("Failed to write to blob");
1449
1450            // Read data past file length (non-empty file)
1451            let result = blob.read_at(0, 20).await;
1452            assert!(result.is_err());
1453        })
1454    }
1455
1456    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1457    where
1458        R::Context: Spawner + Storage + Metrics,
1459    {
1460        runner.start(|context| async move {
1461            let partition = "test_partition";
1462            let name = b"test_blob_rw";
1463
1464            // Open a new blob
1465            let (blob, _) = context
1466                .open(partition, name)
1467                .await
1468                .expect("Failed to open blob");
1469
1470            // Write data to the blob
1471            let data = b"Hello, Storage!";
1472            blob.write_at(0, data)
1473                .await
1474                .expect("Failed to write to blob");
1475
1476            // Sync the blob
1477            blob.sync().await.expect("Failed to sync blob");
1478
1479            // Read data from the blob in clone
1480            let check1 = context.with_label("check1").spawn({
1481                let blob = blob.clone();
1482                let data_len = data.len();
1483                move |_| async move {
1484                    let read = blob
1485                        .read_at(0, data_len)
1486                        .await
1487                        .expect("Failed to read from blob");
1488                    assert_eq!(read.coalesce(), data);
1489                }
1490            });
1491            let check2 = context.with_label("check2").spawn({
1492                let blob = blob.clone();
1493                let data_len = data.len();
1494                move |_| async move {
1495                    let read = blob
1496                        .read_at(0, data_len)
1497                        .await
1498                        .expect("Failed to read from blob");
1499                    assert_eq!(read.coalesce(), data);
1500                }
1501            });
1502
1503            // Wait for both reads to complete
1504            let result = join!(check1, check2);
1505            assert!(result.0.is_ok());
1506            assert!(result.1.is_ok());
1507
1508            // Read data from the blob
1509            let read = blob
1510                .read_at(0, data.len())
1511                .await
1512                .expect("Failed to read from blob");
1513            assert_eq!(read.coalesce(), data);
1514
1515            // Drop the blob
1516            drop(blob);
1517
1518            // Ensure no blobs still open
1519            let buffer = context.encode();
1520            assert!(buffer.contains("open_blobs 0"));
1521        });
1522    }
1523
1524    fn test_shutdown<R: Runner>(runner: R)
1525    where
1526        R::Context: Spawner + Metrics + Clock,
1527    {
1528        let kill = 9;
1529        runner.start(|context| async move {
1530            // Spawn a task that waits for signal
1531            let before = context
1532                .with_label("before")
1533                .spawn(move |context| async move {
1534                    let mut signal = context.stopped();
1535                    let value = (&mut signal).await.unwrap();
1536                    assert_eq!(value, kill);
1537                    drop(signal);
1538                });
1539
1540            // Signal the tasks and wait for them to stop
1541            let result = context.clone().stop(kill, None).await;
1542            assert!(result.is_ok());
1543
1544            // Spawn a task after stop is called
1545            let after = context
1546                .with_label("after")
1547                .spawn(move |context| async move {
1548                    // A call to `stopped()` after `stop()` resolves immediately
1549                    let value = context.stopped().await.unwrap();
1550                    assert_eq!(value, kill);
1551                });
1552
1553            // Ensure both tasks complete
1554            let result = join!(before, after);
1555            assert!(result.0.is_ok());
1556            assert!(result.1.is_ok());
1557        });
1558    }
1559
1560    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1561    where
1562        R::Context: Spawner + Metrics + Clock,
1563    {
1564        let kill = 42;
1565        runner.start(|context| async move {
1566            let (started_tx, mut started_rx) = mpsc::channel(3);
1567            let counter = Arc::new(AtomicU32::new(0));
1568
1569            // Spawn 3 tasks that do cleanup work after receiving stop signal
1570            // and increment a shared counter
1571            let task = |cleanup_duration: Duration| {
1572                let context = context.clone();
1573                let counter = counter.clone();
1574                let started_tx = started_tx.clone();
1575                context.spawn(move |context| async move {
1576                    // Wait for signal to be acquired
1577                    let mut signal = context.stopped();
1578                    started_tx.send(()).await.unwrap();
1579
1580                    // Increment once killed
1581                    let value = (&mut signal).await.unwrap();
1582                    assert_eq!(value, kill);
1583                    context.sleep(cleanup_duration).await;
1584                    counter.fetch_add(1, Ordering::SeqCst);
1585
1586                    // Wait to drop signal until work has been done
1587                    drop(signal);
1588                })
1589            };
1590
1591            let task1 = task(Duration::from_millis(10));
1592            let task2 = task(Duration::from_millis(20));
1593            let task3 = task(Duration::from_millis(30));
1594
1595            // Give tasks time to start
1596            for _ in 0..3 {
1597                started_rx.recv().await.unwrap();
1598            }
1599
1600            // Stop and verify all cleanup completed
1601            context.stop(kill, None).await.unwrap();
1602            assert_eq!(counter.load(Ordering::SeqCst), 3);
1603
1604            // Ensure all tasks completed
1605            let result = join!(task1, task2, task3);
1606            assert!(result.0.is_ok());
1607            assert!(result.1.is_ok());
1608            assert!(result.2.is_ok());
1609        });
1610    }
1611
1612    fn test_shutdown_timeout<R: Runner>(runner: R)
1613    where
1614        R::Context: Spawner + Metrics + Clock,
1615    {
1616        let kill = 42;
1617        runner.start(|context| async move {
1618            // Setup startup coordinator
1619            let (started_tx, started_rx) = oneshot::channel();
1620
1621            // Spawn a task that never completes its cleanup
1622            context.clone().spawn(move |context| async move {
1623                let signal = context.stopped();
1624                started_tx.send(()).unwrap();
1625                pending::<()>().await;
1626                signal.await.unwrap();
1627            });
1628
1629            // Try to stop with a timeout
1630            started_rx.await.unwrap();
1631            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1632
1633            // Assert that we got a timeout error
1634            assert!(matches!(result, Err(Error::Timeout)));
1635        });
1636    }
1637
1638    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1639    where
1640        R::Context: Spawner + Metrics + Clock,
1641    {
1642        let kill1 = 42;
1643        let kill2 = 43;
1644
1645        runner.start(|context| async move {
1646            let (started_tx, started_rx) = oneshot::channel();
1647            let counter = Arc::new(AtomicU32::new(0));
1648
1649            // Spawn a task that delays completion to test timing
1650            let task = context.with_label("blocking_task").spawn({
1651                let counter = counter.clone();
1652                move |context| async move {
1653                    // Wait for signal to be acquired
1654                    let mut signal = context.stopped();
1655                    started_tx.send(()).unwrap();
1656
1657                    // Wait for signal to be resolved
1658                    let value = (&mut signal).await.unwrap();
1659                    assert_eq!(value, kill1);
1660                    context.sleep(Duration::from_millis(50)).await;
1661
1662                    // Increment counter
1663                    counter.fetch_add(1, Ordering::SeqCst);
1664                    drop(signal);
1665                }
1666            });
1667
1668            // Give task time to start
1669            started_rx.await.unwrap();
1670
1671            // Issue two separate stop calls
1672            // The second stop call uses a different stop value that should be ignored
1673            let stop_task1 = context.clone().stop(kill1, None);
1674            pin_mut!(stop_task1);
1675            let stop_task2 = context.clone().stop(kill2, None);
1676            pin_mut!(stop_task2);
1677
1678            // Both of them should be awaiting completion
1679            assert!(stop_task1.as_mut().now_or_never().is_none());
1680            assert!(stop_task2.as_mut().now_or_never().is_none());
1681
1682            // Wait for both stop calls to complete
1683            assert!(stop_task1.await.is_ok());
1684            assert!(stop_task2.await.is_ok());
1685
1686            // Verify first stop value wins
1687            let sig = context.stopped().await;
1688            assert_eq!(sig.unwrap(), kill1);
1689
1690            // Wait for blocking task to complete
1691            let result = task.await;
1692            assert!(result.is_ok());
1693            assert_eq!(counter.load(Ordering::SeqCst), 1);
1694
1695            // Post-completion stop should return immediately
1696            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1697        });
1698    }
1699
1700    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1701    where
1702        R::Context: Spawner + Metrics,
1703    {
1704        runner.start(|context| async move {
1705            // Spawn a task that waits for signal
1706            context
1707                .with_label("before")
1708                .spawn(move |context| async move {
1709                    let mut signal = context.stopped();
1710                    let value = (&mut signal).await.unwrap();
1711
1712                    // We should never reach this point
1713                    assert_eq!(value, 42);
1714                    drop(signal);
1715                });
1716
1717            // Ensure waker is registered
1718            reschedule().await;
1719        });
1720    }
1721
1722    fn test_spawn_dedicated<R: Runner>(runner: R)
1723    where
1724        R::Context: Spawner,
1725    {
1726        runner.start(|context| async move {
1727            let handle = context.dedicated().spawn(|_| async move { 42 });
1728            assert!(matches!(handle.await, Ok(42)));
1729        });
1730    }
1731
1732    fn test_spawn<R: Runner>(runner: R)
1733    where
1734        R::Context: Spawner + Clock,
1735    {
1736        runner.start(|context| async move {
1737            let child_handle = Arc::new(Mutex::new(None));
1738            let child_handle2 = child_handle.clone();
1739
1740            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1741            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1742            let parent_handle = context.spawn(move |context| async move {
1743                // Spawn child that completes immediately
1744                let handle = context.spawn(|_| async {});
1745
1746                // Store child handle so we can test it later
1747                *child_handle2.lock() = Some(handle);
1748
1749                parent_initialized_tx.send(()).unwrap();
1750
1751                // Parent task completes
1752                parent_complete_rx.await.unwrap();
1753            });
1754
1755            // Wait for parent task to spawn the children
1756            parent_initialized_rx.await.unwrap();
1757
1758            // Child task completes successfully
1759            let child_handle = child_handle.lock().take().unwrap();
1760            assert!(child_handle.await.is_ok());
1761
1762            // Complete the parent task
1763            parent_complete_tx.send(()).unwrap();
1764
1765            // Wait for parent task to complete successfully
1766            assert!(parent_handle.await.is_ok());
1767        });
1768    }
1769
1770    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1771    where
1772        R::Context: Spawner + Clock,
1773    {
1774        runner.start(|context| async move {
1775            let child_handle = Arc::new(Mutex::new(None));
1776            let child_handle2 = child_handle.clone();
1777
1778            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1779            let parent_handle = context.spawn(move |context| async move {
1780                // Spawn child task that hangs forever, should be aborted when parent aborts
1781                let handle = context.spawn(|_| pending::<()>());
1782
1783                // Store child task handle so we can test it later
1784                *child_handle2.lock() = Some(handle);
1785
1786                parent_initialized_tx.send(()).unwrap();
1787
1788                // Parent task runs until aborted
1789                pending::<()>().await
1790            });
1791
1792            // Wait for parent task to spawn the children
1793            parent_initialized_rx.await.unwrap();
1794
1795            // Abort parent task
1796            parent_handle.abort();
1797            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1798
1799            // Child task should also resolve with error since its parent aborted
1800            let child_handle = child_handle.lock().take().unwrap();
1801            assert!(matches!(child_handle.await, Err(Error::Closed)));
1802        });
1803    }
1804
1805    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1806    where
1807        R::Context: Spawner + Clock,
1808    {
1809        runner.start(|context| async move {
1810            let child_handle = Arc::new(Mutex::new(None));
1811            let child_handle2 = child_handle.clone();
1812
1813            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1814            let parent_handle = context.spawn(move |context| async move {
1815                // Spawn child task that hangs forever, should be aborted when parent completes
1816                let handle = context.spawn(|_| pending::<()>());
1817
1818                // Store child task handle so we can test it later
1819                *child_handle2.lock() = Some(handle);
1820
1821                // Parent task completes
1822                parent_complete_rx.await.unwrap();
1823            });
1824
1825            // Fire parent completion
1826            parent_complete_tx.send(()).unwrap();
1827
1828            // Wait for parent task to complete
1829            assert!(parent_handle.await.is_ok());
1830
1831            // Child task should resolve with error since its parent has completed
1832            let child_handle = child_handle.lock().take().unwrap();
1833            assert!(matches!(child_handle.await, Err(Error::Closed)));
1834        });
1835    }
1836
1837    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1838    where
1839        R::Context: Spawner + Clock,
1840    {
1841        runner.start(|context| async move {
1842            // We create the following tree of tasks. All tasks will run
1843            // indefinitely (until aborted).
1844            //
1845            //          root
1846            //     /     |     \
1847            //    /      |      \
1848            //   c0      c1      c2
1849            //  /  \    /  \    /  \
1850            // g0  g1  g2  g3  g4  g5
1851            let c0 = context.clone();
1852            let g0 = c0.clone();
1853            let g1 = c0.clone();
1854            let c1 = context.clone();
1855            let g2 = c1.clone();
1856            let g3 = c1.clone();
1857            let c2 = context.clone();
1858            let g4 = c2.clone();
1859            let g5 = c2.clone();
1860
1861            // Spawn tasks
1862            let handles = Arc::new(Mutex::new(Vec::new()));
1863            let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1864            let root_task = context.spawn({
1865                let handles = handles.clone();
1866                move |_| async move {
1867                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1868                    {
1869                        let handle = context.spawn({
1870                            let handles = handles.clone();
1871                            let initialized_tx = initialized_tx.clone();
1872                            move |_| async move {
1873                                for grandchild in grandchildren {
1874                                    let handle = grandchild.spawn(|_| async {
1875                                        pending::<()>().await;
1876                                    });
1877                                    handles.lock().push(handle);
1878                                    initialized_tx.send(()).await.unwrap();
1879                                }
1880
1881                                pending::<()>().await;
1882                            }
1883                        });
1884                        handles.lock().push(handle);
1885                        initialized_tx.send(()).await.unwrap();
1886                    }
1887
1888                    pending::<()>().await;
1889                }
1890            });
1891
1892            // Wait for tasks to initialize
1893            for _ in 0..9 {
1894                initialized_rx.recv().await.unwrap();
1895            }
1896
1897            // Verify we have all 9 handles (3 children + 6 grandchildren)
1898            assert_eq!(handles.lock().len(), 9);
1899
1900            // Abort root task
1901            root_task.abort();
1902            assert!(matches!(root_task.await, Err(Error::Closed)));
1903
1904            // All handles should resolve with error due to cascading abort
1905            let handles = handles.lock().drain(..).collect::<Vec<_>>();
1906            for handle in handles {
1907                assert!(matches!(handle.await, Err(Error::Closed)));
1908            }
1909        });
1910    }
1911
1912    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1913    where
1914        R::Context: Spawner + Clock,
1915    {
1916        runner.start(|context| async move {
1917            let (child_started_tx, child_started_rx) = oneshot::channel();
1918            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1919            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1920            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1921            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1922            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1923            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1924
1925            let parent = context.spawn(move |context| async move {
1926                // Spawn a child task
1927                let child_handle = context.clone().spawn(|_| async move {
1928                    child_started_tx.send(()).unwrap();
1929                    // Wait for signal to complete
1930                    child_complete_rx.await.unwrap();
1931                });
1932                assert!(
1933                    child_handle_tx.send(child_handle).is_ok(),
1934                    "child handle receiver dropped"
1935                );
1936
1937                // Spawn an independent sibling task
1938                let sibling_handle = context.clone().spawn(move |_| async move {
1939                    sibling_started_tx.send(()).unwrap();
1940                    // Wait for signal to complete
1941                    sibling_complete_rx.await.unwrap();
1942                });
1943                assert!(
1944                    sibling_handle_tx.send(sibling_handle).is_ok(),
1945                    "sibling handle receiver dropped"
1946                );
1947
1948                // Wait for signal to complete
1949                parent_complete_rx.await.unwrap();
1950            });
1951
1952            // Wait for both to start
1953            child_started_rx.await.unwrap();
1954            sibling_started_rx.await.unwrap();
1955
1956            // Kill the sibling
1957            sibling_complete_tx.send(()).unwrap();
1958            assert!(sibling_handle_rx.await.is_ok());
1959
1960            // The child task should still be alive
1961            child_complete_tx.send(()).unwrap();
1962            assert!(child_handle_rx.await.is_ok());
1963
1964            // As well as the parent
1965            parent_complete_tx.send(()).unwrap();
1966            assert!(parent.await.is_ok());
1967        });
1968    }
1969
1970    fn test_spawn_clone_chain<R: Runner>(runner: R)
1971    where
1972        R::Context: Spawner + Clock,
1973    {
1974        runner.start(|context| async move {
1975            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1976            let (child_started_tx, child_started_rx) = oneshot::channel();
1977            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1978            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1979            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1980
1981            let parent = context.clone().spawn({
1982                move |context| async move {
1983                    let child = context.clone().spawn({
1984                        move |context| async move {
1985                            let grandchild = context.clone().spawn({
1986                                move |_| async move {
1987                                    grandchild_started_tx.send(()).unwrap();
1988                                    pending::<()>().await;
1989                                }
1990                            });
1991                            assert!(
1992                                grandchild_handle_tx.send(grandchild).is_ok(),
1993                                "grandchild handle receiver dropped"
1994                            );
1995                            child_started_tx.send(()).unwrap();
1996                            pending::<()>().await;
1997                        }
1998                    });
1999                    assert!(
2000                        child_handle_tx.send(child).is_ok(),
2001                        "child handle receiver dropped"
2002                    );
2003                    parent_started_tx.send(()).unwrap();
2004                    pending::<()>().await;
2005                }
2006            });
2007
2008            parent_started_rx.await.unwrap();
2009            child_started_rx.await.unwrap();
2010            grandchild_started_rx.await.unwrap();
2011
2012            let child_handle = child_handle_rx.await.unwrap();
2013            let grandchild_handle = grandchild_handle_rx.await.unwrap();
2014
2015            parent.abort();
2016            assert!(parent.await.is_err());
2017
2018            assert!(child_handle.await.is_err());
2019            assert!(grandchild_handle.await.is_err());
2020        });
2021    }
2022
2023    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
2024    where
2025        R::Context: Spawner + Clock,
2026    {
2027        runner.start(|context| async move {
2028            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
2029            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
2030
2031            let parent = context.clone().spawn({
2032                move |context| async move {
2033                    let clone1 = context.clone();
2034                    let clone2 = clone1.clone();
2035                    let clone3 = clone2.clone();
2036
2037                    let leaf = clone3.clone().spawn({
2038                        move |_| async move {
2039                            leaf_started_tx.send(()).unwrap();
2040                            pending::<()>().await;
2041                        }
2042                    });
2043
2044                    leaf_handle_tx
2045                        .send(leaf)
2046                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
2047                    pending::<()>().await;
2048                }
2049            });
2050
2051            leaf_started_rx.await.unwrap();
2052            let leaf_handle = leaf_handle_rx.await.unwrap();
2053
2054            parent.abort();
2055            assert!(parent.await.is_err());
2056            assert!(leaf_handle.await.is_err());
2057        });
2058    }
2059
2060    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2061    where
2062        R::Context: Spawner,
2063    {
2064        runner.start(|context| async move {
2065            let context = if dedicated {
2066                context.dedicated()
2067            } else {
2068                context.shared(true)
2069            };
2070
2071            let handle = context.spawn(|_| async move { 42 });
2072            let result = handle.await;
2073            assert!(matches!(result, Ok(42)));
2074        });
2075    }
2076
2077    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2078    where
2079        R::Context: Spawner + Clock,
2080    {
2081        runner.start(|context| async move {
2082            let context = if dedicated {
2083                context.dedicated()
2084            } else {
2085                context.shared(true)
2086            };
2087
2088            context.clone().spawn(|_| async move {
2089                panic!("blocking task panicked");
2090            });
2091
2092            // Loop until panic
2093            loop {
2094                context.sleep(Duration::from_millis(100)).await;
2095            }
2096        });
2097    }
2098
2099    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2100    where
2101        R::Context: Spawner + Clock,
2102    {
2103        let result: Result<(), Error> = runner.start(|context| async move {
2104            let context = if dedicated {
2105                context.dedicated()
2106            } else {
2107                context.shared(true)
2108            };
2109
2110            let handle = context.clone().spawn(|_| async move {
2111                panic!("blocking task panicked");
2112            });
2113            handle.await
2114        });
2115        assert!(matches!(result, Err(Error::Exited)));
2116    }
2117
2118    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2119        runner.start(|_| async move {
2120            // Setup tracked resource
2121            let dropper = Arc::new(());
2122            let executor = deterministic::Runner::default();
2123            executor.start({
2124                let dropper = dropper.clone();
2125                move |context| async move {
2126                    // Create tasks with circular dependencies through channels
2127                    let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2128                    let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2129                    let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2130
2131                    // Task 1 holds tx2 and waits on rx1
2132                    context.with_label("task1").spawn({
2133                        let setup_tx = setup_tx.clone();
2134                        let dropper = dropper.clone();
2135                        move |_| async move {
2136                            // Setup deadlock and mark ready
2137                            tx2.send(()).unwrap();
2138                            rx1.recv().await.unwrap();
2139                            setup_tx.send(()).unwrap();
2140
2141                            // Wait forever
2142                            while rx1.recv().await.is_some() {}
2143                            drop(tx2);
2144                            drop(dropper);
2145                        }
2146                    });
2147
2148                    // Task 2 holds tx1 and waits on rx2
2149                    context.with_label("task2").spawn(move |_| async move {
2150                        // Setup deadlock and mark ready
2151                        tx1.send(()).unwrap();
2152                        rx2.recv().await.unwrap();
2153                        setup_tx.send(()).unwrap();
2154
2155                        // Wait forever
2156                        while rx2.recv().await.is_some() {}
2157                        drop(tx1);
2158                        drop(dropper);
2159                    });
2160
2161                    // Wait for tasks to start
2162                    setup_rx.recv().await.unwrap();
2163                    setup_rx.recv().await.unwrap();
2164                }
2165            });
2166
2167            // After runtime drop, both tasks should be cleaned up
2168            Arc::try_unwrap(dropper).expect("references remaining");
2169        });
2170    }
2171
2172    fn test_late_waker<R: Runner>(runner: R)
2173    where
2174        R::Context: Metrics + Spawner,
2175    {
2176        // A future that captures its waker and sends it to the caller, then
2177        // stays pending forever.
2178        struct CaptureWaker {
2179            tx: Option<oneshot::Sender<Waker>>,
2180            sent: bool,
2181        }
2182        impl Future for CaptureWaker {
2183            type Output = ();
2184            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2185                if !self.sent {
2186                    if let Some(tx) = self.tx.take() {
2187                        // Send a clone of the current task's waker to the root
2188                        let _ = tx.send(cx.waker().clone());
2189                    }
2190                    self.sent = true;
2191                }
2192                Poll::Pending
2193            }
2194        }
2195
2196        // A guard that wakes the captured waker on drop.
2197        struct WakeOnDrop(Option<Waker>);
2198        impl Drop for WakeOnDrop {
2199            fn drop(&mut self) {
2200                if let Some(w) = self.0.take() {
2201                    w.wake_by_ref();
2202                }
2203            }
2204        }
2205
2206        // Run the executor to completion
2207        let holder = runner.start(|context| async move {
2208            // Wire a oneshot to receive the task waker.
2209            let (tx, rx) = oneshot::channel::<Waker>();
2210
2211            // Spawn a task that registers its waker and then stays pending.
2212            context
2213                .with_label("capture_waker")
2214                .spawn(move |_| async move {
2215                    CaptureWaker {
2216                        tx: Some(tx),
2217                        sent: false,
2218                    }
2219                    .await;
2220                });
2221
2222            // Ensure the spawned task runs and registers its waker.
2223            utils::reschedule().await;
2224
2225            // Receive the waker from the spawned task.
2226            let waker = rx.await.expect("waker not received");
2227
2228            // Return a guard that will wake after the runtime has dropped.
2229            WakeOnDrop(Some(waker))
2230        });
2231
2232        // Dropping the guard after the runtime has torn down will trigger a wake on
2233        // a task whose executor has been dropped.
2234        drop(holder);
2235    }
2236
2237    fn test_metrics<R: Runner>(runner: R)
2238    where
2239        R::Context: Metrics,
2240    {
2241        runner.start(|context| async move {
2242            // Assert label
2243            assert_eq!(context.label(), "");
2244
2245            // Register a metric
2246            let counter = Counter::<u64>::default();
2247            context.register("test", "test", counter.clone());
2248
2249            // Increment the counter
2250            counter.inc();
2251
2252            // Encode metrics
2253            let buffer = context.encode();
2254            assert!(buffer.contains("test_total 1"));
2255
2256            // Nested context
2257            let context = context.with_label("nested");
2258            let nested_counter = Counter::<u64>::default();
2259            context.register("test", "test", nested_counter.clone());
2260
2261            // Increment the counter
2262            nested_counter.inc();
2263
2264            // Encode metrics
2265            let buffer = context.encode();
2266            assert!(buffer.contains("nested_test_total 1"));
2267            assert!(buffer.contains("test_total 1"));
2268        });
2269    }
2270
2271    fn test_metrics_with_attribute<R: Runner>(runner: R)
2272    where
2273        R::Context: Metrics,
2274    {
2275        runner.start(|context| async move {
2276            // Create context with a attribute
2277            let ctx_epoch5 = context
2278                .with_label("consensus")
2279                .with_attribute("epoch", "e5");
2280
2281            // Register a metric with the attribute
2282            let counter = Counter::<u64>::default();
2283            ctx_epoch5.register("votes", "vote count", counter.clone());
2284            counter.inc();
2285
2286            // Encode and verify the attribute appears as a label
2287            let buffer = context.encode();
2288            assert!(
2289                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2290                "Expected metric with epoch attribute, got: {}",
2291                buffer
2292            );
2293
2294            // Create context with different epoch attribute (same metric name)
2295            let ctx_epoch6 = context
2296                .with_label("consensus")
2297                .with_attribute("epoch", "e6");
2298            let counter2 = Counter::<u64>::default();
2299            ctx_epoch6.register("votes", "vote count", counter2.clone());
2300            counter2.inc();
2301            counter2.inc();
2302
2303            // Both should appear in encoded output with canonical format (single HELP/TYPE)
2304            let buffer = context.encode();
2305            assert!(
2306                buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2307                "Expected metric with epoch=e5, got: {}",
2308                buffer
2309            );
2310            assert!(
2311                buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2312                "Expected metric with epoch=e6, got: {}",
2313                buffer
2314            );
2315
2316            // Verify canonical format: HELP and TYPE should appear exactly once
2317            assert_eq!(
2318                buffer.matches("# HELP consensus_votes").count(),
2319                1,
2320                "HELP should appear exactly once, got: {}",
2321                buffer
2322            );
2323            assert_eq!(
2324                buffer.matches("# TYPE consensus_votes").count(),
2325                1,
2326                "TYPE should appear exactly once, got: {}",
2327                buffer
2328            );
2329
2330            // Multiple attributes
2331            let ctx_multi = context
2332                .with_label("engine")
2333                .with_attribute("region", "us")
2334                .with_attribute("instance", "i1");
2335            let counter3 = Counter::<u64>::default();
2336            ctx_multi.register("requests", "request count", counter3.clone());
2337            counter3.inc();
2338
2339            let buffer = context.encode();
2340            assert!(
2341                buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2342                "Expected metric with sorted attributes, got: {}",
2343                buffer
2344            );
2345        });
2346    }
2347
2348    #[test]
2349    fn test_deterministic_metrics_with_attribute() {
2350        let executor = deterministic::Runner::default();
2351        test_metrics_with_attribute(executor);
2352    }
2353
2354    #[test]
2355    fn test_tokio_metrics_with_attribute() {
2356        let runner = tokio::Runner::default();
2357        test_metrics_with_attribute(runner);
2358    }
2359
2360    fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2361    where
2362        R::Context: Metrics,
2363    {
2364        runner.start(|context| async move {
2365            // Create context with attribute, then nest a label
2366            let ctx = context
2367                .with_label("orchestrator")
2368                .with_attribute("epoch", "e5")
2369                .with_label("engine");
2370
2371            // Register a metric
2372            let counter = Counter::<u64>::default();
2373            ctx.register("votes", "vote count", counter.clone());
2374            counter.inc();
2375
2376            // Verify the attribute is preserved through the nested label
2377            let buffer = context.encode();
2378            assert!(
2379                buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2380                "Expected metric with preserved epoch attribute, got: {}",
2381                buffer
2382            );
2383
2384            // Multiple levels of nesting with attributes at different levels
2385            let ctx2 = context
2386                .with_label("outer")
2387                .with_attribute("region", "us")
2388                .with_label("middle")
2389                .with_attribute("az", "east")
2390                .with_label("inner");
2391
2392            let counter2 = Counter::<u64>::default();
2393            ctx2.register("requests", "request count", counter2.clone());
2394            counter2.inc();
2395            counter2.inc();
2396
2397            let buffer = context.encode();
2398            assert!(
2399                buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2400                "Expected metric with all attributes preserved and sorted, got: {}",
2401                buffer
2402            );
2403        });
2404    }
2405
2406    #[test]
2407    fn test_deterministic_metrics_attribute_with_nested_label() {
2408        let executor = deterministic::Runner::default();
2409        test_metrics_attribute_with_nested_label(executor);
2410    }
2411
2412    #[test]
2413    fn test_tokio_metrics_attribute_with_nested_label() {
2414        let runner = tokio::Runner::default();
2415        test_metrics_attribute_with_nested_label(runner);
2416    }
2417
2418    fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2419    where
2420        R::Context: Metrics,
2421    {
2422        runner.start(|context| async move {
2423            // Create two separate sub-contexts, each with their own attribute
2424            let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2425            let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2426
2427            // Register metrics in ctx_a
2428            let c1 = Counter::<u64>::default();
2429            ctx_a.register("requests", "help", c1);
2430
2431            // Register metrics in ctx_b
2432            let c2 = Counter::<u64>::default();
2433            ctx_b.register("requests", "help", c2);
2434
2435            // Register another metric in ctx_a AFTER ctx_b was used
2436            let c3 = Counter::<u64>::default();
2437            ctx_a.register("errors", "help", c3);
2438
2439            let output = context.encode();
2440
2441            // ctx_a metrics should only have epoch=1
2442            assert!(
2443                output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2444                "ctx_a requests should have epoch=1: {output}"
2445            );
2446            assert!(
2447                output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2448                "ctx_a errors should have epoch=1: {output}"
2449            );
2450            assert!(
2451                !output.contains("component_a_requests_total{epoch=\"2\"}"),
2452                "ctx_a requests should not have epoch=2: {output}"
2453            );
2454
2455            // ctx_b metrics should only have epoch=2
2456            assert!(
2457                output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2458                "ctx_b should have epoch=2: {output}"
2459            );
2460            assert!(
2461                !output.contains("component_b_requests_total{epoch=\"1\"}"),
2462                "ctx_b should not have epoch=1: {output}"
2463            );
2464        });
2465    }
2466
2467    #[test]
2468    fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2469        let executor = deterministic::Runner::default();
2470        test_metrics_attributes_isolated_between_contexts(executor);
2471    }
2472
2473    #[test]
2474    fn test_tokio_metrics_attributes_isolated_between_contexts() {
2475        let runner = tokio::Runner::default();
2476        test_metrics_attributes_isolated_between_contexts(runner);
2477    }
2478
2479    /// Regression test for https://github.com/commonwarexyz/monorepo/issues/3485.
2480    ///
2481    /// Verifies the documented guarantee that runtime task metrics ignore context
2482    /// attributes: spawning with a varying `with_attribute` (as the marshaled
2483    /// consensus code does for each round) must not create per-value entries in
2484    /// `runtime_tasks_spawned` / `runtime_tasks_running`.
2485    fn test_metrics_spawn_attribute_cardinality<R: Runner>(runner: R)
2486    where
2487        R::Context: Spawner + Metrics + Clock,
2488    {
2489        runner.start(|context| async move {
2490            const ROUNDS: u64 = 128;
2491
2492            let mut handles = Vec::with_capacity(ROUNDS as usize);
2493            for round in 0..ROUNDS {
2494                let handle = context
2495                    .with_label("deferred_verify")
2496                    .with_attribute("round", round)
2497                    .spawn(move |_| async move { round });
2498                handles.push(handle);
2499            }
2500            for (expected, handle) in handles.into_iter().enumerate() {
2501                assert_eq!(handle.await.expect("task failed"), expected as u64);
2502            }
2503
2504            // handle.await resolves when the task's output is ready, but
2505            // the running-gauge decrement fires on task-struct drop which
2506            // may lag slightly. Yield to the executor so it can run cleanup.
2507            while count_running_tasks(&context, "deferred_verify") > 0 {
2508                context.sleep(Duration::from_millis(10)).await;
2509            }
2510            let buffer = context.encode();
2511
2512            // Count occurrences of each runtime task metric for our label. If
2513            // attributes were incorrectly folded into the task family key, we
2514            // would see ROUNDS distinct time series instead of one.
2515            let spawned_lines = buffer
2516                .lines()
2517                .filter(|line| {
2518                    line.starts_with("runtime_tasks_spawned_total{")
2519                        && line.contains("name=\"deferred_verify\"")
2520                })
2521                .count();
2522            let running_lines = buffer
2523                .lines()
2524                .filter(|line| {
2525                    line.starts_with("runtime_tasks_running{")
2526                        && line.contains("name=\"deferred_verify\"")
2527                })
2528                .count();
2529            assert_eq!(
2530                spawned_lines, 1,
2531                "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2532            );
2533            assert_eq!(
2534                running_lines, 1,
2535                "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2536            );
2537
2538            // The single spawned-counter entry should reflect every round.
2539            let spawned_value = format!(
2540                "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2541            );
2542            assert!(
2543                buffer.contains(&spawned_value),
2544                "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2545            );
2546            let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2547            assert!(
2548                buffer.contains(running_value),
2549                "expected running gauge to return to 0, got: {buffer}",
2550            );
2551
2552            // The per-round attribute must not surface on task metrics (the
2553            // task `Label` does not include context attributes).
2554            assert!(
2555                !buffer
2556                    .lines()
2557                    .any(|line| line.starts_with("runtime_tasks_")
2558                        && line.contains("round=")),
2559                "task metrics must not carry `round` attribute: {buffer}",
2560            );
2561        });
2562    }
2563
2564    #[test]
2565    fn test_deterministic_metrics_spawn_attribute_cardinality() {
2566        let executor = deterministic::Runner::default();
2567        test_metrics_spawn_attribute_cardinality(executor);
2568    }
2569
2570    #[test]
2571    fn test_tokio_metrics_spawn_attribute_cardinality() {
2572        let runner = tokio::Runner::default();
2573        test_metrics_spawn_attribute_cardinality(runner);
2574    }
2575
2576    /// Regression test for https://github.com/commonwarexyz/monorepo/issues/3485.
2577    ///
2578    /// Verifies the documented guarantee that runtime task metrics ignore the
2579    /// scope of the spawning context: tasks spawned from a [`Metrics::with_scope`]
2580    /// context must still appear exactly once in the root-registry
2581    /// `runtime_tasks_spawned` / `runtime_tasks_running` families keyed only by
2582    /// `name` / `kind` / `execution`, and dropping the scope must neither remove
2583    /// those entries nor create additional ones.
2584    fn test_metrics_spawn_scope_cardinality<R: Runner>(runner: R)
2585    where
2586        R::Context: Spawner + Metrics + Clock,
2587    {
2588        runner.start(|context| async move {
2589            const ROUNDS: u64 = 128;
2590
2591            let mut handles = Vec::with_capacity(ROUNDS as usize);
2592            for round in 0..ROUNDS {
2593                let scoped = context
2594                    .with_label("deferred_verify")
2595                    .with_attribute("round", round)
2596                    .with_scope();
2597                let handle = scoped.spawn(move |_| async move { round });
2598                handles.push(handle);
2599                // `scoped` is dropped here; task metrics must persist regardless.
2600            }
2601            for (expected, handle) in handles.into_iter().enumerate() {
2602                assert_eq!(handle.await.expect("task failed"), expected as u64);
2603            }
2604
2605            // handle.await resolves when the task's output is ready, but
2606            // the running-gauge decrement fires on task-struct drop which
2607            // may lag slightly. Yield to the executor so it can run cleanup.
2608            while count_running_tasks(&context, "deferred_verify") > 0 {
2609                context.sleep(Duration::from_millis(10)).await;
2610            }
2611            let buffer = context.encode();
2612
2613            let spawned_lines = buffer
2614                .lines()
2615                .filter(|line| {
2616                    line.starts_with("runtime_tasks_spawned_total{")
2617                        && line.contains("name=\"deferred_verify\"")
2618                })
2619                .count();
2620            let running_lines = buffer
2621                .lines()
2622                .filter(|line| {
2623                    line.starts_with("runtime_tasks_running{")
2624                        && line.contains("name=\"deferred_verify\"")
2625                })
2626                .count();
2627            assert_eq!(
2628                spawned_lines, 1,
2629                "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2630            );
2631            assert_eq!(
2632                running_lines, 1,
2633                "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2634            );
2635
2636            let spawned_value = format!(
2637                "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2638            );
2639            assert!(
2640                buffer.contains(&spawned_value),
2641                "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2642            );
2643            let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2644            assert!(
2645                buffer.contains(running_value),
2646                "expected running gauge to return to 0, got: {buffer}",
2647            );
2648        });
2649    }
2650
2651    #[test]
2652    fn test_deterministic_metrics_spawn_scope_cardinality() {
2653        let executor = deterministic::Runner::default();
2654        test_metrics_spawn_scope_cardinality(executor);
2655    }
2656
2657    #[test]
2658    fn test_tokio_metrics_spawn_scope_cardinality() {
2659        let runner = tokio::Runner::default();
2660        test_metrics_spawn_scope_cardinality(runner);
2661    }
2662
2663    fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2664    where
2665        R::Context: Metrics,
2666    {
2667        runner.start(|context| async move {
2668            // Create two contexts with same attributes but different order
2669            let ctx_ab = context
2670                .with_label("service")
2671                .with_attribute("region", "us")
2672                .with_attribute("env", "prod");
2673
2674            let ctx_ba = context
2675                .with_label("service")
2676                .with_attribute("env", "prod")
2677                .with_attribute("region", "us");
2678
2679            // Register via first context
2680            let c1 = Counter::<u64>::default();
2681            ctx_ab.register("requests", "help", c1.clone());
2682            c1.inc();
2683
2684            // Register via second context - same attributes, different metric
2685            let c2 = Counter::<u64>::default();
2686            ctx_ba.register("errors", "help", c2.clone());
2687            c2.inc();
2688            c2.inc();
2689
2690            let output = context.encode();
2691
2692            // Both should have the same label order (alphabetically sorted: env, region)
2693            assert!(
2694                output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2695                "requests should have sorted labels: {output}"
2696            );
2697            assert!(
2698                output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2699                "errors should have sorted labels: {output}"
2700            );
2701
2702            // Should NOT have reverse order
2703            assert!(
2704                !output.contains("region=\"us\",env=\"prod\""),
2705                "should not have unsorted label order: {output}"
2706            );
2707        });
2708    }
2709
2710    #[test]
2711    fn test_deterministic_metrics_attributes_sorted_deterministically() {
2712        let executor = deterministic::Runner::default();
2713        test_metrics_attributes_sorted_deterministically(executor);
2714    }
2715
2716    #[test]
2717    fn test_tokio_metrics_attributes_sorted_deterministically() {
2718        let runner = tokio::Runner::default();
2719        test_metrics_attributes_sorted_deterministically(runner);
2720    }
2721
2722    fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2723    where
2724        R::Context: Metrics,
2725    {
2726        runner.start(|context| async move {
2727            // Service A: plain, no nested labels
2728            let svc_a = context.with_label("service_a");
2729
2730            // Service A with attribute (same top-level label, different context)
2731            let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2732
2733            // Service B with nested label: service_b_worker
2734            let svc_b_worker = context.with_label("service_b").with_label("worker");
2735
2736            // Service B with nested label AND attribute
2737            let svc_b_worker_shard = context
2738                .with_label("service_b")
2739                .with_label("worker")
2740                .with_attribute("shard", 99);
2741
2742            // Service B different nested label: service_b_manager
2743            let svc_b_manager = context.with_label("service_b").with_label("manager");
2744
2745            // Service C: plain, proves no cross-service contamination
2746            let svc_c = context.with_label("service_c");
2747
2748            // Register metrics in all contexts
2749            let c1 = Counter::<u64>::default();
2750            svc_a.register("requests", "help", c1);
2751
2752            let c2 = Counter::<u64>::default();
2753            svc_a_v2.register("requests", "help", c2);
2754
2755            let c3 = Counter::<u64>::default();
2756            svc_b_worker.register("tasks", "help", c3);
2757
2758            let c4 = Counter::<u64>::default();
2759            svc_b_worker_shard.register("tasks", "help", c4);
2760
2761            let c5 = Counter::<u64>::default();
2762            svc_b_manager.register("decisions", "help", c5);
2763
2764            let c6 = Counter::<u64>::default();
2765            svc_c.register("requests", "help", c6);
2766
2767            let output = context.encode();
2768
2769            // Service A plain and attributed both exist independently
2770            assert!(
2771                output.contains("service_a_requests_total 0"),
2772                "svc_a plain should exist: {output}"
2773            );
2774            assert!(
2775                output.contains("service_a_requests_total{version=\"2\"} 0"),
2776                "svc_a_v2 should have version=2: {output}"
2777            );
2778
2779            // Service B worker: plain and attributed versions
2780            assert!(
2781                output.contains("service_b_worker_tasks_total 0"),
2782                "svc_b_worker plain should exist: {output}"
2783            );
2784            assert!(
2785                output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2786                "svc_b_worker_shard should have shard=99: {output}"
2787            );
2788
2789            // Service B manager: no attributes
2790            assert!(
2791                output.contains("service_b_manager_decisions_total 0"),
2792                "svc_b_manager should have no attributes: {output}"
2793            );
2794            assert!(
2795                !output.contains("service_b_manager_decisions_total{"),
2796                "svc_b_manager should have no attributes at all: {output}"
2797            );
2798
2799            // Service C: no attributes, no contamination
2800            assert!(
2801                output.contains("service_c_requests_total 0"),
2802                "svc_c should have no attributes: {output}"
2803            );
2804            assert!(
2805                !output.contains("service_c_requests_total{"),
2806                "svc_c should have no attributes at all: {output}"
2807            );
2808
2809            // Cross-contamination checks
2810            assert!(
2811                !output.contains("service_b_manager_decisions_total{shard="),
2812                "svc_b_manager should not have shard: {output}"
2813            );
2814            assert!(
2815                !output.contains("service_a_requests_total{shard="),
2816                "svc_a should not have shard: {output}"
2817            );
2818            assert!(
2819                !output.contains("service_c_requests_total{version="),
2820                "svc_c should not have version: {output}"
2821            );
2822        });
2823    }
2824
2825    #[test]
2826    fn test_deterministic_metrics_nested_labels_with_attributes() {
2827        let executor = deterministic::Runner::default();
2828        test_metrics_nested_labels_with_attributes(executor);
2829    }
2830
2831    #[test]
2832    fn test_tokio_metrics_nested_labels_with_attributes() {
2833        let runner = tokio::Runner::default();
2834        test_metrics_nested_labels_with_attributes(runner);
2835    }
2836
2837    fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2838    where
2839        R::Context: Metrics,
2840    {
2841        runner.start(|context| async move {
2842            #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2843            struct RequestLabels {
2844                method: String,
2845                status: u16,
2846            }
2847
2848            // Create context with attribute
2849            let ctx = context
2850                .with_label("api")
2851                .with_attribute("region", "us_east")
2852                .with_attribute("env", "prod");
2853
2854            // Register a Family metric
2855            let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2856            ctx.register("requests", "HTTP requests", requests.clone());
2857
2858            // Increment counters for different label combinations
2859            requests
2860                .get_or_create(&RequestLabels {
2861                    method: "GET".to_string(),
2862                    status: 200,
2863                })
2864                .inc();
2865            requests
2866                .get_or_create(&RequestLabels {
2867                    method: "POST".to_string(),
2868                    status: 201,
2869                })
2870                .inc();
2871            requests
2872                .get_or_create(&RequestLabels {
2873                    method: "GET".to_string(),
2874                    status: 404,
2875                })
2876                .inc();
2877
2878            let output = context.encode();
2879
2880            // Context attributes appear first (alphabetically sorted), then Family labels
2881            // Context attributes: env="prod", region="us_east"
2882            // Family labels: method, status
2883            assert!(
2884                output.contains(
2885                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2886                ),
2887                "GET 200 should have merged labels: {output}"
2888            );
2889            assert!(
2890                output.contains(
2891                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2892                ),
2893                "POST 201 should have merged labels: {output}"
2894            );
2895            assert!(
2896                output.contains(
2897                    "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2898                ),
2899                "GET 404 should have merged labels: {output}"
2900            );
2901
2902            // Create another context WITHOUT attributes to verify isolation
2903            let ctx_plain = context.with_label("api_plain");
2904            let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2905            ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2906
2907            plain_requests
2908                .get_or_create(&RequestLabels {
2909                    method: "DELETE".to_string(),
2910                    status: 204,
2911                })
2912                .inc();
2913
2914            let output = context.encode();
2915
2916            // Plain context should have Family labels but no context attributes
2917            assert!(
2918                output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2919                "plain DELETE should have only family labels: {output}"
2920            );
2921            assert!(
2922                !output.contains("api_plain_requests_total{env="),
2923                "plain should not have env attribute: {output}"
2924            );
2925            assert!(
2926                !output.contains("api_plain_requests_total{region="),
2927                "plain should not have region attribute: {output}"
2928            );
2929        });
2930    }
2931
2932    #[test]
2933    fn test_deterministic_metrics_family_with_attributes() {
2934        let executor = deterministic::Runner::default();
2935        test_metrics_family_with_attributes(executor);
2936    }
2937
2938    #[test]
2939    fn test_tokio_metrics_family_with_attributes() {
2940        let runner = tokio::Runner::default();
2941        test_metrics_family_with_attributes(runner);
2942    }
2943
2944    fn test_with_scope_register_and_encode<R: Runner>(runner: R)
2945    where
2946        R::Context: Metrics,
2947    {
2948        runner.start(|context| async move {
2949            let scoped = context.with_label("engine").with_scope();
2950            let counter = Counter::<u64>::default();
2951            scoped.register("votes", "vote count", counter.clone());
2952            counter.inc();
2953
2954            let buffer = context.encode();
2955            assert!(
2956                buffer.contains("engine_votes_total 1"),
2957                "scoped metric should appear in encode: {buffer}"
2958            );
2959        });
2960    }
2961
2962    #[test]
2963    fn test_deterministic_with_scope_register_and_encode() {
2964        let executor = deterministic::Runner::default();
2965        test_with_scope_register_and_encode(executor);
2966    }
2967
2968    #[test]
2969    fn test_tokio_with_scope_register_and_encode() {
2970        let runner = tokio::Runner::default();
2971        test_with_scope_register_and_encode(runner);
2972    }
2973
2974    fn test_with_scope_drop_removes_metrics<R: Runner>(runner: R)
2975    where
2976        R::Context: Metrics,
2977    {
2978        runner.start(|context| async move {
2979            // Register a permanent metric
2980            let permanent = Counter::<u64>::default();
2981            context.with_label("permanent").register(
2982                "counter",
2983                "permanent counter",
2984                permanent.clone(),
2985            );
2986            permanent.inc();
2987
2988            // Register a scoped metric
2989            let scoped = context.with_label("engine").with_scope();
2990            let counter = Counter::<u64>::default();
2991            scoped.register("votes", "vote count", counter.clone());
2992            counter.inc();
2993
2994            // Both should appear
2995            let buffer = context.encode();
2996            assert!(buffer.contains("permanent_counter_total 1"));
2997            assert!(buffer.contains("engine_votes_total 1"));
2998
2999            // Drop the scoped context
3000            drop(scoped);
3001
3002            // Only permanent metric should remain
3003            let buffer = context.encode();
3004            assert!(
3005                buffer.contains("permanent_counter_total 1"),
3006                "permanent metric should survive scope drop: {buffer}"
3007            );
3008            assert!(
3009                !buffer.contains("engine_votes"),
3010                "scoped metric should be removed after scope drop: {buffer}"
3011            );
3012        });
3013    }
3014
3015    #[test]
3016    fn test_deterministic_with_scope_drop_removes_metrics() {
3017        let executor = deterministic::Runner::default();
3018        test_with_scope_drop_removes_metrics(executor);
3019    }
3020
3021    #[test]
3022    fn test_tokio_with_scope_drop_removes_metrics() {
3023        let runner = tokio::Runner::default();
3024        test_with_scope_drop_removes_metrics(runner);
3025    }
3026
3027    fn test_with_scope_attributes<R: Runner>(runner: R)
3028    where
3029        R::Context: Metrics,
3030    {
3031        runner.start(|context| async move {
3032            // Simulate epoch lifecycle
3033            let epoch1 = context
3034                .with_label("engine")
3035                .with_attribute("epoch", 1)
3036                .with_scope();
3037            let c1 = Counter::<u64>::default();
3038            epoch1.register("votes", "vote count", c1.clone());
3039            c1.inc();
3040
3041            let epoch2 = context
3042                .with_label("engine")
3043                .with_attribute("epoch", 2)
3044                .with_scope();
3045            let c2 = Counter::<u64>::default();
3046            epoch2.register("votes", "vote count", c2.clone());
3047            c2.inc();
3048            c2.inc();
3049
3050            // Both epochs visible
3051            let buffer = context.encode();
3052            assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
3053            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
3054
3055            // HELP/TYPE lines should be deduplicated across scopes
3056            assert_eq!(
3057                buffer.matches("# HELP engine_votes").count(),
3058                1,
3059                "HELP should appear once: {buffer}"
3060            );
3061            assert_eq!(
3062                buffer.matches("# TYPE engine_votes").count(),
3063                1,
3064                "TYPE should appear once: {buffer}"
3065            );
3066
3067            // Drop epoch 1 context
3068            drop(epoch1);
3069            let buffer = context.encode();
3070            assert!(
3071                !buffer.contains("epoch=\"1\""),
3072                "epoch 1 should be gone: {buffer}"
3073            );
3074            assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
3075
3076            // Drop epoch 2 context
3077            drop(epoch2);
3078            let buffer = context.encode();
3079            assert!(
3080                !buffer.contains("engine_votes"),
3081                "all epoch metrics should be gone: {buffer}"
3082            );
3083        });
3084    }
3085
3086    #[test]
3087    fn test_deterministic_with_scope_attributes() {
3088        let executor = deterministic::Runner::default();
3089        test_with_scope_attributes(executor);
3090    }
3091
3092    #[test]
3093    fn test_tokio_with_scope_attributes() {
3094        let runner = tokio::Runner::default();
3095        test_with_scope_attributes(runner);
3096    }
3097
3098    fn test_with_scope_inherits_on_with_label<R: Runner>(runner: R)
3099    where
3100        R::Context: Metrics,
3101    {
3102        runner.start(|context| async move {
3103            let scoped = context.with_label("engine").with_scope();
3104
3105            // Child context inherits scope
3106            let child = scoped.with_label("batcher");
3107            let counter = Counter::<u64>::default();
3108            child.register("msgs", "message count", counter.clone());
3109            counter.inc();
3110
3111            let buffer = context.encode();
3112            assert!(buffer.contains("engine_batcher_msgs_total 1"));
3113
3114            // Dropping all scoped contexts removes all metrics in the scope
3115            drop(child);
3116            drop(scoped);
3117            let buffer = context.encode();
3118            assert!(
3119                !buffer.contains("engine_batcher_msgs"),
3120                "child metric should be removed with scope: {buffer}"
3121            );
3122        });
3123    }
3124
3125    #[test]
3126    fn test_deterministic_with_scope_inherits_on_with_label() {
3127        let executor = deterministic::Runner::default();
3128        test_with_scope_inherits_on_with_label(executor);
3129    }
3130
3131    #[test]
3132    fn test_tokio_with_scope_inherits_on_with_label() {
3133        let runner = tokio::Runner::default();
3134        test_with_scope_inherits_on_with_label(runner);
3135    }
3136
3137    fn test_multiple_scopes<R: Runner>(runner: R)
3138    where
3139        R::Context: Metrics,
3140    {
3141        runner.start(|context| async move {
3142            let ctx_a = context.with_label("a").with_scope();
3143            let ctx_b = context.with_label("b").with_scope();
3144
3145            let ca = Counter::<u64>::default();
3146            ctx_a.register("counter", "a counter", ca.clone());
3147            ca.inc();
3148
3149            let cb = Counter::<u64>::default();
3150            ctx_b.register("counter", "b counter", cb.clone());
3151            cb.inc();
3152            cb.inc();
3153
3154            let buffer = context.encode();
3155            assert!(buffer.contains("a_counter_total 1"));
3156            assert!(buffer.contains("b_counter_total 2"));
3157
3158            // Drop only scope a
3159            drop(ctx_a);
3160            let buffer = context.encode();
3161            assert!(!buffer.contains("a_counter"));
3162            assert!(buffer.contains("b_counter_total 2"));
3163
3164            // Drop scope b
3165            drop(ctx_b);
3166            let buffer = context.encode();
3167            assert!(!buffer.contains("b_counter"));
3168        });
3169    }
3170
3171    #[test]
3172    fn test_deterministic_multiple_scopes() {
3173        let executor = deterministic::Runner::default();
3174        test_multiple_scopes(executor);
3175    }
3176
3177    #[test]
3178    fn test_tokio_multiple_scopes() {
3179        let runner = tokio::Runner::default();
3180        test_multiple_scopes(runner);
3181    }
3182
3183    fn test_encode_single_eof<R: Runner>(runner: R)
3184    where
3185        R::Context: Metrics,
3186    {
3187        runner.start(|context| async move {
3188            let root_counter = Counter::<u64>::default();
3189            context.register("root", "root metric", root_counter.clone());
3190            root_counter.inc();
3191
3192            let scoped = context.with_label("engine").with_scope();
3193            let scoped_counter = Counter::<u64>::default();
3194            scoped.register("ops", "scoped metric", scoped_counter.clone());
3195            scoped_counter.inc();
3196
3197            let buffer = context.encode();
3198            assert!(
3199                buffer.contains("root_total 1"),
3200                "root metric missing: {buffer}"
3201            );
3202            assert!(
3203                buffer.contains("engine_ops_total 1"),
3204                "scoped metric missing: {buffer}"
3205            );
3206            assert_eq!(
3207                buffer.matches("# EOF").count(),
3208                1,
3209                "expected exactly one EOF marker: {buffer}"
3210            );
3211            assert!(
3212                buffer.ends_with("# EOF\n"),
3213                "EOF must be the last line: {buffer}"
3214            );
3215        });
3216    }
3217
3218    #[test]
3219    fn test_deterministic_encode_single_eof() {
3220        let executor = deterministic::Runner::default();
3221        test_encode_single_eof(executor);
3222    }
3223
3224    #[test]
3225    fn test_tokio_encode_single_eof() {
3226        let runner = tokio::Runner::default();
3227        test_encode_single_eof(runner);
3228    }
3229
3230    fn test_with_scope_nested_inherits<R: Runner>(runner: R)
3231    where
3232        R::Context: Metrics,
3233    {
3234        runner.start(|context| async move {
3235            let scoped = context.with_label("engine").with_scope();
3236
3237            // Calling with_scope() on an already-scoped context inherits the scope
3238            let nested = scoped.with_scope();
3239            let counter = Counter::<u64>::default();
3240            nested.register("votes", "vote count", counter.clone());
3241            counter.inc();
3242
3243            let buffer = context.encode();
3244            assert!(
3245                buffer.contains("engine_votes_total 1"),
3246                "nested scope should inherit parent scope: {buffer}"
3247            );
3248
3249            // Dropping the nested context alone should NOT clean up metrics
3250            // because the parent scoped context still holds the Arc
3251            drop(nested);
3252            let buffer = context.encode();
3253            assert!(
3254                buffer.contains("engine_votes_total 1"),
3255                "metrics should survive as long as any scope clone exists: {buffer}"
3256            );
3257
3258            // Dropping the parent scoped context cleans up
3259            drop(scoped);
3260            let buffer = context.encode();
3261            assert!(
3262                !buffer.contains("engine_votes"),
3263                "metrics should be removed when all scope clones are dropped: {buffer}"
3264            );
3265        });
3266    }
3267
3268    #[test]
3269    fn test_deterministic_with_scope_nested_inherits() {
3270        let executor = deterministic::Runner::default();
3271        test_with_scope_nested_inherits(executor);
3272    }
3273
3274    #[test]
3275    fn test_tokio_with_scope_nested_inherits() {
3276        let runner = tokio::Runner::default();
3277        test_with_scope_nested_inherits(runner);
3278    }
3279
3280    #[test]
3281    #[should_panic(expected = "duplicate metric:")]
3282    fn test_deterministic_reregister_after_scope_drop() {
3283        let executor = deterministic::Runner::default();
3284        executor.start(|context| async move {
3285            let scoped = context
3286                .with_label("engine")
3287                .with_attribute("epoch", 1)
3288                .with_scope();
3289            let c1 = Counter::<u64>::default();
3290            scoped.register("votes", "vote count", c1);
3291            drop(scoped);
3292
3293            // Re-registering the same key after scope drop is not allowed
3294            let scoped2 = context
3295                .with_label("engine")
3296                .with_attribute("epoch", 1)
3297                .with_scope();
3298            let c2 = Counter::<u64>::default();
3299            scoped2.register("votes", "vote count", c2);
3300        });
3301    }
3302
3303    fn test_with_scope_family_with_attributes<R: Runner>(runner: R)
3304    where
3305        R::Context: Metrics,
3306    {
3307        #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3308        struct Peer {
3309            name: String,
3310        }
3311        impl EncodeLabelSet for Peer {
3312            fn encode(
3313                &self,
3314                encoder: &mut prometheus_client::encoding::LabelSetEncoder<'_>,
3315            ) -> Result<(), std::fmt::Error> {
3316                let mut label = encoder.encode_label();
3317                let mut key = label.encode_label_key()?;
3318                EncodeLabelKey::encode(&"peer", &mut key)?;
3319                let mut value = key.encode_label_value()?;
3320                EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3321                value.finish()
3322            }
3323        }
3324
3325        runner.start(|context| async move {
3326            let scoped = context
3327                .with_label("batcher")
3328                .with_attribute("epoch", 1)
3329                .with_scope();
3330
3331            let family: Family<Peer, Counter> = Family::default();
3332            scoped.register("votes", "votes per peer", family.clone());
3333            family
3334                .get_or_create(&Peer {
3335                    name: "alice".into(),
3336                })
3337                .inc();
3338            family.get_or_create(&Peer { name: "bob".into() }).inc();
3339
3340            let buffer = context.encode();
3341            assert!(
3342                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3343                "family with attributes should combine labels: {buffer}"
3344            );
3345            assert!(
3346                buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3347                "family with attributes should combine labels: {buffer}"
3348            );
3349
3350            drop(scoped);
3351            let buffer = context.encode();
3352            assert!(
3353                !buffer.contains("batcher_votes"),
3354                "family metrics should be removed: {buffer}"
3355            );
3356        });
3357    }
3358
3359    #[test]
3360    fn test_deterministic_with_scope_family_with_attributes() {
3361        let executor = deterministic::Runner::default();
3362        test_with_scope_family_with_attributes(executor);
3363    }
3364
3365    #[test]
3366    fn test_tokio_with_scope_family_with_attributes() {
3367        let runner = tokio::Runner::default();
3368        test_with_scope_family_with_attributes(runner);
3369    }
3370
3371    #[test]
3372    fn test_deterministic_future() {
3373        let runner = deterministic::Runner::default();
3374        test_error_future(runner);
3375    }
3376
3377    #[test]
3378    fn test_deterministic_clock_sleep() {
3379        let executor = deterministic::Runner::default();
3380        test_clock_sleep(executor);
3381    }
3382
3383    #[test]
3384    fn test_deterministic_clock_sleep_until() {
3385        let executor = deterministic::Runner::default();
3386        test_clock_sleep_until(executor);
3387    }
3388
3389    #[test]
3390    fn test_deterministic_clock_sleep_until_far_future() {
3391        let executor = deterministic::Runner::default();
3392        test_clock_sleep_until_far_future(executor);
3393    }
3394
3395    #[test]
3396    fn test_deterministic_clock_timeout() {
3397        let executor = deterministic::Runner::default();
3398        test_clock_timeout(executor);
3399    }
3400
3401    #[test]
3402    fn test_deterministic_root_finishes() {
3403        let executor = deterministic::Runner::default();
3404        test_root_finishes(executor);
3405    }
3406
3407    #[test]
3408    fn test_deterministic_spawn_after_abort() {
3409        let executor = deterministic::Runner::default();
3410        test_spawn_after_abort(executor);
3411    }
3412
3413    #[test]
3414    fn test_deterministic_spawn_abort() {
3415        let executor = deterministic::Runner::default();
3416        test_spawn_abort(executor, false, false);
3417    }
3418
3419    #[test]
3420    #[should_panic(expected = "blah")]
3421    fn test_deterministic_panic_aborts_root() {
3422        let runner = deterministic::Runner::default();
3423        test_panic_aborts_root(runner);
3424    }
3425
3426    #[test]
3427    #[should_panic(expected = "blah")]
3428    fn test_deterministic_panic_aborts_root_caught() {
3429        let cfg = deterministic::Config::default().with_catch_panics(true);
3430        let runner = deterministic::Runner::new(cfg);
3431        test_panic_aborts_root(runner);
3432    }
3433
3434    #[test]
3435    #[should_panic(expected = "blah")]
3436    fn test_deterministic_panic_aborts_spawn() {
3437        let executor = deterministic::Runner::default();
3438        test_panic_aborts_spawn(executor);
3439    }
3440
3441    #[test]
3442    fn test_deterministic_panic_aborts_spawn_caught() {
3443        let cfg = deterministic::Config::default().with_catch_panics(true);
3444        let executor = deterministic::Runner::new(cfg);
3445        test_panic_aborts_spawn_caught(executor);
3446    }
3447
3448    #[test]
3449    #[should_panic(expected = "boom")]
3450    fn test_deterministic_multiple_panics() {
3451        let executor = deterministic::Runner::default();
3452        test_multiple_panics(executor);
3453    }
3454
3455    #[test]
3456    fn test_deterministic_multiple_panics_caught() {
3457        let cfg = deterministic::Config::default().with_catch_panics(true);
3458        let executor = deterministic::Runner::new(cfg);
3459        test_multiple_panics_caught(executor);
3460    }
3461
3462    #[test]
3463    fn test_deterministic_select() {
3464        let executor = deterministic::Runner::default();
3465        test_select(executor);
3466    }
3467
3468    #[test]
3469    fn test_deterministic_select_loop() {
3470        let executor = deterministic::Runner::default();
3471        test_select_loop(executor);
3472    }
3473
3474    #[test]
3475    fn test_deterministic_storage_operations() {
3476        let executor = deterministic::Runner::default();
3477        test_storage_operations(executor);
3478    }
3479
3480    #[test]
3481    fn test_deterministic_blob_read_write() {
3482        let executor = deterministic::Runner::default();
3483        test_blob_read_write(executor);
3484    }
3485
3486    #[test]
3487    fn test_deterministic_blob_resize() {
3488        let executor = deterministic::Runner::default();
3489        test_blob_resize(executor);
3490    }
3491
3492    #[test]
3493    fn test_deterministic_many_partition_read_write() {
3494        let executor = deterministic::Runner::default();
3495        test_many_partition_read_write(executor);
3496    }
3497
3498    #[test]
3499    fn test_deterministic_blob_read_past_length() {
3500        let executor = deterministic::Runner::default();
3501        test_blob_read_past_length(executor);
3502    }
3503
3504    #[test]
3505    fn test_deterministic_blob_clone_and_concurrent_read() {
3506        // Run test
3507        let executor = deterministic::Runner::default();
3508        test_blob_clone_and_concurrent_read(executor);
3509    }
3510
3511    #[test]
3512    fn test_deterministic_shutdown() {
3513        let executor = deterministic::Runner::default();
3514        test_shutdown(executor);
3515    }
3516
3517    #[test]
3518    fn test_deterministic_shutdown_multiple_signals() {
3519        let executor = deterministic::Runner::default();
3520        test_shutdown_multiple_signals(executor);
3521    }
3522
3523    #[test]
3524    fn test_deterministic_shutdown_timeout() {
3525        let executor = deterministic::Runner::default();
3526        test_shutdown_timeout(executor);
3527    }
3528
3529    #[test]
3530    fn test_deterministic_shutdown_multiple_stop_calls() {
3531        let executor = deterministic::Runner::default();
3532        test_shutdown_multiple_stop_calls(executor);
3533    }
3534
3535    #[test]
3536    fn test_deterministic_unfulfilled_shutdown() {
3537        let executor = deterministic::Runner::default();
3538        test_unfulfilled_shutdown(executor);
3539    }
3540
3541    #[test]
3542    fn test_deterministic_spawn_dedicated() {
3543        let executor = deterministic::Runner::default();
3544        test_spawn_dedicated(executor);
3545    }
3546
3547    #[test]
3548    fn test_deterministic_spawn() {
3549        let runner = deterministic::Runner::default();
3550        test_spawn(runner);
3551    }
3552
3553    #[test]
3554    fn test_deterministic_spawn_abort_on_parent_abort() {
3555        let runner = deterministic::Runner::default();
3556        test_spawn_abort_on_parent_abort(runner);
3557    }
3558
3559    #[test]
3560    fn test_deterministic_spawn_abort_on_parent_completion() {
3561        let runner = deterministic::Runner::default();
3562        test_spawn_abort_on_parent_completion(runner);
3563    }
3564
3565    #[test]
3566    fn test_deterministic_spawn_cascading_abort() {
3567        let runner = deterministic::Runner::default();
3568        test_spawn_cascading_abort(runner);
3569    }
3570
3571    #[test]
3572    fn test_deterministic_child_survives_sibling_completion() {
3573        let runner = deterministic::Runner::default();
3574        test_child_survives_sibling_completion(runner);
3575    }
3576
3577    #[test]
3578    fn test_deterministic_spawn_clone_chain() {
3579        let runner = deterministic::Runner::default();
3580        test_spawn_clone_chain(runner);
3581    }
3582
3583    #[test]
3584    fn test_deterministic_spawn_sparse_clone_chain() {
3585        let runner = deterministic::Runner::default();
3586        test_spawn_sparse_clone_chain(runner);
3587    }
3588
3589    #[test]
3590    fn test_deterministic_spawn_blocking() {
3591        for dedicated in [false, true] {
3592            let executor = deterministic::Runner::default();
3593            test_spawn_blocking(executor, dedicated);
3594        }
3595    }
3596
3597    #[test]
3598    #[should_panic(expected = "blocking task panicked")]
3599    fn test_deterministic_spawn_blocking_panic() {
3600        for dedicated in [false, true] {
3601            let executor = deterministic::Runner::default();
3602            test_spawn_blocking_panic(executor, dedicated);
3603        }
3604    }
3605
3606    #[test]
3607    fn test_deterministic_spawn_blocking_panic_caught() {
3608        for dedicated in [false, true] {
3609            let cfg = deterministic::Config::default().with_catch_panics(true);
3610            let executor = deterministic::Runner::new(cfg);
3611            test_spawn_blocking_panic_caught(executor, dedicated);
3612        }
3613    }
3614
3615    #[test]
3616    fn test_deterministic_spawn_blocking_abort() {
3617        for (dedicated, blocking) in [(false, true), (true, false)] {
3618            let executor = deterministic::Runner::default();
3619            test_spawn_abort(executor, dedicated, blocking);
3620        }
3621    }
3622
3623    #[test]
3624    fn test_deterministic_circular_reference_prevents_cleanup() {
3625        let executor = deterministic::Runner::default();
3626        test_circular_reference_prevents_cleanup(executor);
3627    }
3628
3629    #[test]
3630    fn test_deterministic_late_waker() {
3631        let executor = deterministic::Runner::default();
3632        test_late_waker(executor);
3633    }
3634
3635    #[test]
3636    fn test_deterministic_metrics() {
3637        let executor = deterministic::Runner::default();
3638        test_metrics(executor);
3639    }
3640
3641    #[test_collect_traces]
3642    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3643        let executor = deterministic::Runner::new(deterministic::Config::default());
3644        executor.start(|context| async move {
3645            context
3646                .with_label("test")
3647                .with_span()
3648                .spawn(|context| async move {
3649                    tracing::info!(field = "test field", "test log");
3650
3651                    context
3652                        .with_label("inner")
3653                        .with_span()
3654                        .spawn(|_| async move {
3655                            tracing::info!("inner log");
3656                        })
3657                        .await
3658                        .unwrap();
3659                })
3660                .await
3661                .unwrap();
3662        });
3663
3664        let info_traces = traces.get_by_level(Level::INFO);
3665        assert_eq!(info_traces.len(), 2);
3666
3667        // Outer log (single span)
3668        info_traces
3669            .expect_event_at_index(0, |event| {
3670                event.metadata.expect_content_exact("test log")?;
3671                event.metadata.expect_field_count(1)?;
3672                event.metadata.expect_field_exact("field", "test field")?;
3673                event.expect_span_count(1)?;
3674                event.expect_span_at_index(0, |span| {
3675                    span.expect_content_exact("task")?;
3676                    span.expect_field_count(1)?;
3677                    span.expect_field_exact("name", "test")
3678                })
3679            })
3680            .unwrap();
3681
3682        info_traces
3683            .expect_event_at_index(1, |event| {
3684                event.metadata.expect_content_exact("inner log")?;
3685                event.metadata.expect_field_count(0)?;
3686                event.expect_span_count(1)?;
3687                event.expect_span_at_index(0, |span| {
3688                    span.expect_content_exact("task")?;
3689                    span.expect_field_count(1)?;
3690                    span.expect_field_exact("name", "test_inner")
3691                })
3692            })
3693            .unwrap();
3694    }
3695
3696    #[test]
3697    fn test_deterministic_resolver() {
3698        let executor = deterministic::Runner::default();
3699        executor.start(|context| async move {
3700            // Register DNS mappings
3701            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3702            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3703            context.resolver_register("example.com", Some(vec![ip1, ip2]));
3704
3705            // Resolve registered hostname
3706            let addrs = context.resolve("example.com").await.unwrap();
3707            assert_eq!(addrs, vec![ip1, ip2]);
3708
3709            // Resolve unregistered hostname
3710            let result = context.resolve("unknown.com").await;
3711            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3712
3713            // Remove mapping
3714            context.resolver_register("example.com", None);
3715            let result = context.resolve("example.com").await;
3716            assert!(matches!(result, Err(Error::ResolveFailed(_))));
3717        });
3718    }
3719
3720    #[test]
3721    fn test_tokio_error_future() {
3722        let runner = tokio::Runner::default();
3723        test_error_future(runner);
3724    }
3725
3726    #[test]
3727    fn test_tokio_clock_sleep() {
3728        let executor = tokio::Runner::default();
3729        test_clock_sleep(executor);
3730    }
3731
3732    #[test]
3733    fn test_tokio_clock_sleep_until() {
3734        let executor = tokio::Runner::default();
3735        test_clock_sleep_until(executor);
3736    }
3737
3738    #[test]
3739    fn test_tokio_clock_sleep_until_far_future() {
3740        let executor = tokio::Runner::default();
3741        test_clock_sleep_until_far_future(executor);
3742    }
3743
3744    #[test]
3745    fn test_tokio_clock_timeout() {
3746        let executor = tokio::Runner::default();
3747        test_clock_timeout(executor);
3748    }
3749
3750    #[test]
3751    fn test_tokio_root_finishes() {
3752        let executor = tokio::Runner::default();
3753        test_root_finishes(executor);
3754    }
3755
3756    #[test]
3757    fn test_tokio_spawn_after_abort() {
3758        let executor = tokio::Runner::default();
3759        test_spawn_after_abort(executor);
3760    }
3761
3762    #[test]
3763    fn test_tokio_spawn_abort() {
3764        let executor = tokio::Runner::default();
3765        test_spawn_abort(executor, false, false);
3766    }
3767
3768    #[test]
3769    #[should_panic(expected = "blah")]
3770    fn test_tokio_panic_aborts_root() {
3771        let executor = tokio::Runner::default();
3772        test_panic_aborts_root(executor);
3773    }
3774
3775    #[test]
3776    #[should_panic(expected = "blah")]
3777    fn test_tokio_panic_aborts_root_caught() {
3778        let cfg = tokio::Config::default().with_catch_panics(true);
3779        let executor = tokio::Runner::new(cfg);
3780        test_panic_aborts_root(executor);
3781    }
3782
3783    #[test]
3784    #[should_panic(expected = "blah")]
3785    fn test_tokio_panic_aborts_spawn() {
3786        let executor = tokio::Runner::default();
3787        test_panic_aborts_spawn(executor);
3788    }
3789
3790    #[test]
3791    fn test_tokio_panic_aborts_spawn_caught() {
3792        let cfg = tokio::Config::default().with_catch_panics(true);
3793        let executor = tokio::Runner::new(cfg);
3794        test_panic_aborts_spawn_caught(executor);
3795    }
3796
3797    #[test]
3798    #[should_panic(expected = "boom")]
3799    fn test_tokio_multiple_panics() {
3800        let executor = tokio::Runner::default();
3801        test_multiple_panics(executor);
3802    }
3803
3804    #[test]
3805    fn test_tokio_multiple_panics_caught() {
3806        let cfg = tokio::Config::default().with_catch_panics(true);
3807        let executor = tokio::Runner::new(cfg);
3808        test_multiple_panics_caught(executor);
3809    }
3810
3811    #[test]
3812    fn test_tokio_select() {
3813        let executor = tokio::Runner::default();
3814        test_select(executor);
3815    }
3816
3817    #[test]
3818    fn test_tokio_select_loop() {
3819        let executor = tokio::Runner::default();
3820        test_select_loop(executor);
3821    }
3822
3823    #[test]
3824    fn test_tokio_storage_operations() {
3825        let executor = tokio::Runner::default();
3826        test_storage_operations(executor);
3827    }
3828
3829    #[test]
3830    fn test_tokio_blob_read_write() {
3831        let executor = tokio::Runner::default();
3832        test_blob_read_write(executor);
3833    }
3834
3835    #[test]
3836    fn test_tokio_blob_resize() {
3837        let executor = tokio::Runner::default();
3838        test_blob_resize(executor);
3839    }
3840
3841    #[test]
3842    fn test_tokio_many_partition_read_write() {
3843        let executor = tokio::Runner::default();
3844        test_many_partition_read_write(executor);
3845    }
3846
3847    #[test]
3848    fn test_tokio_blob_read_past_length() {
3849        let executor = tokio::Runner::default();
3850        test_blob_read_past_length(executor);
3851    }
3852
3853    #[test]
3854    fn test_tokio_blob_clone_and_concurrent_read() {
3855        // Run test
3856        let executor = tokio::Runner::default();
3857        test_blob_clone_and_concurrent_read(executor);
3858    }
3859
3860    #[test]
3861    fn test_tokio_shutdown() {
3862        let executor = tokio::Runner::default();
3863        test_shutdown(executor);
3864    }
3865
3866    #[test]
3867    fn test_tokio_shutdown_multiple_signals() {
3868        let executor = tokio::Runner::default();
3869        test_shutdown_multiple_signals(executor);
3870    }
3871
3872    #[test]
3873    fn test_tokio_shutdown_timeout() {
3874        let executor = tokio::Runner::default();
3875        test_shutdown_timeout(executor);
3876    }
3877
3878    #[test]
3879    fn test_tokio_shutdown_multiple_stop_calls() {
3880        let executor = tokio::Runner::default();
3881        test_shutdown_multiple_stop_calls(executor);
3882    }
3883
3884    #[test]
3885    fn test_tokio_unfulfilled_shutdown() {
3886        let executor = tokio::Runner::default();
3887        test_unfulfilled_shutdown(executor);
3888    }
3889
3890    #[test]
3891    fn test_tokio_spawn_dedicated() {
3892        let executor = tokio::Runner::default();
3893        test_spawn_dedicated(executor);
3894    }
3895
3896    #[test]
3897    fn test_tokio_spawn() {
3898        let runner = tokio::Runner::default();
3899        test_spawn(runner);
3900    }
3901
3902    #[test]
3903    fn test_tokio_spawn_abort_on_parent_abort() {
3904        let runner = tokio::Runner::default();
3905        test_spawn_abort_on_parent_abort(runner);
3906    }
3907
3908    #[test]
3909    fn test_tokio_spawn_abort_on_parent_completion() {
3910        let runner = tokio::Runner::default();
3911        test_spawn_abort_on_parent_completion(runner);
3912    }
3913
3914    #[test]
3915    fn test_tokio_spawn_cascading_abort() {
3916        let runner = tokio::Runner::default();
3917        test_spawn_cascading_abort(runner);
3918    }
3919
3920    #[test]
3921    fn test_tokio_child_survives_sibling_completion() {
3922        let runner = tokio::Runner::default();
3923        test_child_survives_sibling_completion(runner);
3924    }
3925
3926    #[test]
3927    fn test_tokio_spawn_clone_chain() {
3928        let runner = tokio::Runner::default();
3929        test_spawn_clone_chain(runner);
3930    }
3931
3932    #[test]
3933    fn test_tokio_spawn_sparse_clone_chain() {
3934        let runner = tokio::Runner::default();
3935        test_spawn_sparse_clone_chain(runner);
3936    }
3937
3938    #[test]
3939    fn test_tokio_spawn_blocking() {
3940        for dedicated in [false, true] {
3941            let executor = tokio::Runner::default();
3942            test_spawn_blocking(executor, dedicated);
3943        }
3944    }
3945
3946    #[test]
3947    #[should_panic(expected = "blocking task panicked")]
3948    fn test_tokio_spawn_blocking_panic() {
3949        for dedicated in [false, true] {
3950            let executor = tokio::Runner::default();
3951            test_spawn_blocking_panic(executor, dedicated);
3952        }
3953    }
3954
3955    #[test]
3956    fn test_tokio_spawn_blocking_panic_caught() {
3957        for dedicated in [false, true] {
3958            let cfg = tokio::Config::default().with_catch_panics(true);
3959            let executor = tokio::Runner::new(cfg);
3960            test_spawn_blocking_panic_caught(executor, dedicated);
3961        }
3962    }
3963
3964    #[test]
3965    fn test_tokio_spawn_blocking_abort() {
3966        for (dedicated, blocking) in [(false, true), (true, false)] {
3967            let executor = tokio::Runner::default();
3968            test_spawn_abort(executor, dedicated, blocking);
3969        }
3970    }
3971
3972    #[test]
3973    fn test_tokio_circular_reference_prevents_cleanup() {
3974        let executor = tokio::Runner::default();
3975        test_circular_reference_prevents_cleanup(executor);
3976    }
3977
3978    #[test]
3979    fn test_tokio_late_waker() {
3980        let executor = tokio::Runner::default();
3981        test_late_waker(executor);
3982    }
3983
3984    #[test]
3985    fn test_tokio_metrics() {
3986        let executor = tokio::Runner::default();
3987        test_metrics(executor);
3988    }
3989
3990    #[test]
3991    fn test_tokio_process_rss_metric() {
3992        let executor = tokio::Runner::default();
3993        executor.start(|context| async move {
3994            loop {
3995                // Wait for RSS metric to be available
3996                let metrics = context.encode();
3997                if !metrics.contains("runtime_process_rss") {
3998                    context.sleep(Duration::from_millis(100)).await;
3999                    continue;
4000                }
4001
4002                // Verify the RSS value is eventually populated (greater than 0)
4003                for line in metrics.lines() {
4004                    if line.starts_with("runtime_process_rss")
4005                        && !line.starts_with("runtime_process_rss{")
4006                    {
4007                        let parts: Vec<&str> = line.split_whitespace().collect();
4008                        if parts.len() >= 2 {
4009                            let rss_value: i64 =
4010                                parts[1].parse().expect("Failed to parse RSS value");
4011                            if rss_value > 0 {
4012                                return;
4013                            }
4014                        }
4015                    }
4016                }
4017            }
4018        });
4019    }
4020
4021    #[test]
4022    fn test_tokio_telemetry() {
4023        let executor = tokio::Runner::default();
4024        executor.start(|context| async move {
4025            // Define the server address
4026            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
4027
4028            // Configure telemetry
4029            tokio::telemetry::init(
4030                context.with_label("metrics"),
4031                tokio::telemetry::Logging {
4032                    level: Level::INFO,
4033                    json: false,
4034                },
4035                Some(address),
4036                None,
4037            );
4038
4039            // Register a test metric
4040            let counter: Counter<u64> = Counter::default();
4041            context.register("test_counter", "Test counter", counter.clone());
4042            counter.inc();
4043
4044            // Helper functions to parse HTTP response
4045            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
4046                let mut line = Vec::new();
4047                loop {
4048                    let received = stream.recv(1).await?;
4049                    let byte = received.coalesce().as_ref()[0];
4050                    if byte == b'\n' {
4051                        if line.last() == Some(&b'\r') {
4052                            line.pop(); // Remove trailing \r
4053                        }
4054                        break;
4055                    }
4056                    line.push(byte);
4057                }
4058                String::from_utf8(line).map_err(|_| Error::ReadFailed)
4059            }
4060
4061            async fn read_headers<St: Stream>(
4062                stream: &mut St,
4063            ) -> Result<HashMap<String, String>, Error> {
4064                let mut headers = HashMap::new();
4065                loop {
4066                    let line = read_line(stream).await?;
4067                    if line.is_empty() {
4068                        break;
4069                    }
4070                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
4071                    if parts.len() == 2 {
4072                        headers.insert(parts[0].to_string(), parts[1].to_string());
4073                    }
4074                }
4075                Ok(headers)
4076            }
4077
4078            async fn read_body<St: Stream>(
4079                stream: &mut St,
4080                content_length: usize,
4081            ) -> Result<String, Error> {
4082                let received = stream.recv(content_length).await?;
4083                String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
4084            }
4085
4086            // Simulate a client connecting to the server
4087            let client_handle = context
4088                .with_label("client")
4089                .spawn(move |context| async move {
4090                    let (mut sink, mut stream) = loop {
4091                        match context.dial(address).await {
4092                            Ok((sink, stream)) => break (sink, stream),
4093                            Err(e) => {
4094                                // The client may be polled before the server is ready, that's alright!
4095                                error!(err =?e, "failed to connect");
4096                                context.sleep(Duration::from_millis(10)).await;
4097                            }
4098                        }
4099                    };
4100
4101                    // Send a GET request to the server
4102                    let request = format!(
4103                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
4104                    );
4105                    sink.send(Bytes::from(request)).await.unwrap();
4106
4107                    // Read and verify the HTTP status line
4108                    let status_line = read_line(&mut stream).await.unwrap();
4109                    assert_eq!(status_line, "HTTP/1.1 200 OK");
4110
4111                    // Read and parse headers
4112                    let headers = read_headers(&mut stream).await.unwrap();
4113                    println!("Headers: {headers:?}");
4114                    let content_length = headers
4115                        .get("content-length")
4116                        .unwrap()
4117                        .parse::<usize>()
4118                        .unwrap();
4119
4120                    // Read and verify the body
4121                    let body = read_body(&mut stream, content_length).await.unwrap();
4122                    assert!(body.contains("test_counter_total 1"));
4123                });
4124
4125            // Wait for the client task to complete
4126            client_handle.await.unwrap();
4127        });
4128    }
4129
4130    #[test]
4131    fn test_tokio_resolver() {
4132        let executor = tokio::Runner::default();
4133        executor.start(|context| async move {
4134            let addrs = context.resolve("localhost").await.unwrap();
4135            assert!(!addrs.is_empty());
4136            for addr in addrs {
4137                assert!(
4138                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
4139                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
4140                );
4141            }
4142        });
4143    }
4144
4145    #[test]
4146    fn test_create_thread_pool_tokio() {
4147        let executor = tokio::Runner::default();
4148        executor.start(|context| async move {
4149            // Create a thread pool with 4 threads
4150            let pool = context
4151                .with_label("pool")
4152                .create_thread_pool(NZUsize!(4))
4153                .unwrap();
4154
4155            // Create a vector of numbers
4156            let v: Vec<_> = (0..10000).collect();
4157
4158            // Use the thread pool to sum the numbers
4159            pool.install(|| {
4160                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
4161            });
4162        });
4163    }
4164
4165    #[test]
4166    fn test_create_thread_pool_deterministic() {
4167        let executor = deterministic::Runner::default();
4168        executor.start(|context| async move {
4169            // Create a thread pool with 4 threads
4170            let pool = context
4171                .with_label("pool")
4172                .create_thread_pool(NZUsize!(4))
4173                .unwrap();
4174
4175            // Create a vector of numbers
4176            let v: Vec<_> = (0..10000).collect();
4177
4178            // Use the thread pool to sum the numbers
4179            pool.install(|| {
4180                assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
4181            });
4182        });
4183    }
4184
4185    fn test_buffer_pooler<R: Runner>(
4186        runner: R,
4187        expected_network_max_per_class: usize,
4188        expected_storage_max_per_class: usize,
4189    ) where
4190        R::Context: BufferPooler,
4191    {
4192        runner.start(|context| async move {
4193            // Verify network pool is accessible and works (cache-line aligned)
4194            let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
4195            assert!(net_buf.capacity() >= 1024);
4196
4197            // Verify storage pool is accessible and works (page-aligned)
4198            let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
4199            assert!(storage_buf.capacity() >= 4096);
4200
4201            // Verify pools have expected configurations
4202            assert_eq!(
4203                context.network_buffer_pool().config().max_per_class.get(),
4204                expected_network_max_per_class
4205            );
4206            assert_eq!(
4207                context.storage_buffer_pool().config().max_per_class.get(),
4208                expected_storage_max_per_class
4209            );
4210        });
4211    }
4212
4213    #[test]
4214    fn test_deterministic_buffer_pooler() {
4215        test_buffer_pooler(deterministic::Runner::default(), 4096, 64);
4216
4217        let runner = deterministic::Runner::new(
4218            deterministic::Config::default()
4219                .with_network_buffer_pool_config(
4220                    BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
4221                )
4222                .with_storage_buffer_pool_config(
4223                    BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
4224                ),
4225        );
4226        test_buffer_pooler(runner, 64, 8);
4227    }
4228
4229    #[test]
4230    fn test_tokio_buffer_pooler() {
4231        test_buffer_pooler(tokio::Runner::default(), 4096, 64);
4232
4233        let runner = tokio::Runner::new(
4234            tokio::Config::default()
4235                .with_network_buffer_pool_config(
4236                    BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
4237                )
4238                .with_storage_buffer_pool_config(
4239                    BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
4240                ),
4241        );
4242        test_buffer_pooler(runner, 64, 8);
4243    }
4244}