commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20#![doc(
21    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22    html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_macros::select;
26use commonware_utils::StableBuf;
27use prometheus_client::registry::Metric;
28use std::{
29    future::Future,
30    io::Error as IoError,
31    net::SocketAddr,
32    time::{Duration, SystemTime},
33};
34use thiserror::Error;
35
36#[macro_use]
37mod macros;
38
39pub mod deterministic;
40pub mod mocks;
41cfg_if::cfg_if! {
42    if #[cfg(not(target_arch = "wasm32"))] {
43        pub mod tokio;
44        pub mod benchmarks;
45    }
46}
47mod network;
48mod process;
49mod storage;
50pub mod telemetry;
51mod utils;
52pub use utils::*;
53#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
54mod iouring;
55
56/// Prefix for runtime metrics.
57const METRICS_PREFIX: &str = "runtime";
58
59/// Errors that can occur when interacting with the runtime.
60#[derive(Error, Debug)]
61pub enum Error {
62    #[error("exited")]
63    Exited,
64    #[error("closed")]
65    Closed,
66    #[error("timeout")]
67    Timeout,
68    #[error("bind failed")]
69    BindFailed,
70    #[error("connection failed")]
71    ConnectionFailed,
72    #[error("write failed")]
73    WriteFailed,
74    #[error("read failed")]
75    ReadFailed,
76    #[error("send failed")]
77    SendFailed,
78    #[error("recv failed")]
79    RecvFailed,
80    #[error("dns resolution failed: {0}")]
81    ResolveFailed(String),
82    #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
83    PartitionNameInvalid(String),
84    #[error("partition creation failed: {0}")]
85    PartitionCreationFailed(String),
86    #[error("partition missing: {0}")]
87    PartitionMissing(String),
88    #[error("partition corrupt: {0}")]
89    PartitionCorrupt(String),
90    #[error("blob open failed: {0}/{1} error: {2}")]
91    BlobOpenFailed(String, String, IoError),
92    #[error("blob missing: {0}/{1}")]
93    BlobMissing(String, String),
94    #[error("blob resize failed: {0}/{1} error: {2}")]
95    BlobResizeFailed(String, String, IoError),
96    #[error("blob sync failed: {0}/{1} error: {2}")]
97    BlobSyncFailed(String, String, IoError),
98    #[error("blob insufficient length")]
99    BlobInsufficientLength,
100    #[error("offset overflow")]
101    OffsetOverflow,
102    #[error("io error: {0}")]
103    Io(#[from] IoError),
104}
105
106/// Interface that any task scheduler must implement to start
107/// running tasks.
108pub trait Runner {
109    /// Context defines the environment available to tasks.
110    /// Example of possible services provided by the context include:
111    /// - [Clock] for time-based operations
112    /// - [Network] for network operations
113    /// - [Storage] for storage operations
114    type Context;
115
116    /// Start running a root task.
117    ///
118    /// When this function returns, all spawned tasks will be canceled. If clean
119    /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
120    /// [Spawner::stopped] to coordinate clean shutdown.
121    fn start<F, Fut>(self, f: F) -> Fut::Output
122    where
123        F: FnOnce(Self::Context) -> Fut,
124        Fut: Future;
125}
126
127/// Interface that any task scheduler must implement to spawn tasks.
128pub trait Spawner: Clone + Send + Sync + 'static {
129    /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
130    ///
131    /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
132    /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
133    /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
134    /// [`Spawner::dedicated`] instead.
135    ///
136    /// The shared executor with `blocking == false` is the default spawn mode.
137    fn shared(self, blocking: bool) -> Self;
138
139    /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
140    ///
141    /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
142    /// shared executor.
143    ///
144    /// This is not the default behavior. See [`Spawner::shared`] for more information.
145    fn dedicated(self) -> Self;
146
147    /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
148    fn instrumented(self) -> Self;
149
150    /// Spawn a task with the current context.
151    ///
152    /// Unlike directly awaiting a future, the task starts running immediately even if the caller
153    /// never awaits the returned [`Handle`].
154    ///
155    /// # Mandatory Supervision
156    ///
157    /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
158    ///
159    /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
160    /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
161    ///
162    /// ```txt
163    /// ctx_a
164    ///   |
165    ///   +-- clone() ---> ctx_c
166    ///   |                  |
167    ///   |                  +-- spawn() ---> Task C (ctx_d)
168    ///   |
169    ///   +-- spawn() ---> Task A (ctx_b)
170    ///                              |
171    ///                              +-- spawn() ---> Task B (ctx_e)
172    ///
173    /// Task A finishes or aborts --> Task B and Task C are aborted
174    /// ```
175    ///
176    /// # Spawn Configuration
177    ///
178    /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
179    /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
180    ///
181    /// Child tasks should assume they start from a clean configuration without needing to inspect how their
182    /// parent was configured.
183    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
184    where
185        F: FnOnce(Self) -> Fut + Send + 'static,
186        Fut: Future<Output = T> + Send + 'static,
187        T: Send + 'static;
188
189    /// Signals the runtime to stop execution and waits for all outstanding tasks
190    /// to perform any required cleanup and exit.
191    ///
192    /// This method does not actually kill any tasks but rather signals to them, using
193    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
194    /// It then waits for all [signal::Signal] references to be dropped before returning.
195    ///
196    /// ## Multiple Stop Calls
197    ///
198    /// This method is idempotent and safe to call multiple times concurrently (on
199    /// different instances of the same context since it consumes `self`). The first
200    /// call initiates shutdown with the provided `value`, and all subsequent calls
201    /// will wait for the same completion regardless of their `value` parameter, i.e.
202    /// the original `value` from the first call is preserved.
203    ///
204    /// ## Timeout
205    ///
206    /// If a timeout is provided, the method will return an error if all [signal::Signal]
207    /// references have not been dropped within the specified duration.
208    fn stop(
209        self,
210        value: i32,
211        timeout: Option<Duration>,
212    ) -> impl Future<Output = Result<(), Error>> + Send;
213
214    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
215    /// any task.
216    ///
217    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
218    /// immediately. The [signal::Signal] returned will always resolve to the value of the
219    /// first [Spawner::stop] call.
220    fn stopped(&self) -> signal::Signal;
221}
222
223/// Interface to register and encode metrics.
224pub trait Metrics: Clone + Send + Sync + 'static {
225    /// Get the current label of the context.
226    fn label(&self) -> String;
227
228    /// Create a new instance of `Metrics` with the given label appended to the end
229    /// of the current `Metrics` label.
230    ///
231    /// This is commonly used to create a nested context for `register`.
232    ///
233    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
234    /// label (reserved for metrics for the runtime).
235    fn with_label(&self, label: &str) -> Self;
236
237    /// Prefix the given label with the current context's label.
238    ///
239    /// Unlike `with_label`, this method does not create a new context.
240    fn scoped_label(&self, label: &str) -> String {
241        let label = if self.label().is_empty() {
242            label.to_string()
243        } else {
244            format!("{}_{}", self.label(), label)
245        };
246        assert!(
247            !label.starts_with(METRICS_PREFIX),
248            "using runtime label is not allowed"
249        );
250        label
251    }
252
253    /// Register a metric with the runtime.
254    ///
255    /// Any registered metric will include (as a prefix) the label of the current context.
256    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
257
258    /// Encode all metrics into a buffer.
259    fn encode(&self) -> String;
260}
261
262/// Re-export of [governor::Quota] for rate limiting configuration.
263pub use governor::Quota;
264
265/// A direct (non-keyed) rate limiter using the provided [governor::clock::Clock] `C`.
266///
267/// This is a convenience type alias for creating single-entity rate limiters.
268/// For per-key rate limiting, use [KeyedRateLimiter].
269pub type RateLimiter<C> = governor::RateLimiter<
270    governor::state::NotKeyed,
271    governor::state::InMemoryState,
272    C,
273    governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
274>;
275
276/// A rate limiter keyed by `K` using the provided [governor::clock::Clock] `C`.
277///
278/// This is a convenience type alias for creating per-peer rate limiters
279/// using governor's [HashMapStateStore].
280///
281/// [HashMapStateStore]: governor::state::keyed::HashMapStateStore
282pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
283    K,
284    governor::state::keyed::HashMapStateStore<K>,
285    C,
286    governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
287>;
288
289/// Interface that any task scheduler must implement to provide
290/// time-based operations.
291///
292/// It is necessary to mock time to provide deterministic execution
293/// of arbitrary tasks.
294pub trait Clock:
295    governor::clock::Clock<Instant = SystemTime>
296    + governor::clock::ReasonablyRealtime
297    + Clone
298    + Send
299    + Sync
300    + 'static
301{
302    /// Returns the current time.
303    fn current(&self) -> SystemTime;
304
305    /// Sleep for the given duration.
306    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
307
308    /// Sleep until the given deadline.
309    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
310
311    /// Await a future with a timeout, returning `Error::Timeout` if it expires.
312    ///
313    /// # Examples
314    ///
315    /// ```
316    /// use std::time::Duration;
317    /// use commonware_runtime::{deterministic, Error, Runner, Clock};
318    ///
319    /// let executor = deterministic::Runner::default();
320    /// executor.start(|context| async move {
321    ///     match context
322    ///         .timeout(Duration::from_millis(100), async { 42 })
323    ///         .await
324    ///     {
325    ///         Ok(value) => assert_eq!(value, 42),
326    ///         Err(Error::Timeout) => panic!("should not timeout"),
327    ///         Err(e) => panic!("unexpected error: {:?}", e),
328    ///     }
329    /// });
330    /// ```
331    fn timeout<F, T>(
332        &self,
333        duration: Duration,
334        future: F,
335    ) -> impl Future<Output = Result<T, Error>> + Send + '_
336    where
337        F: Future<Output = T> + Send + 'static,
338        T: Send + 'static,
339    {
340        async move {
341            select! {
342                result = future => {
343                    Ok(result)
344                },
345                _ = self.sleep(duration) => {
346                    Err(Error::Timeout)
347                },
348            }
349        }
350    }
351}
352
353cfg_if::cfg_if! {
354    if #[cfg(feature = "external")] {
355        /// Interface that runtimes can implement to constrain the execution latency of a future.
356        pub trait Pacer: Clock + Clone + Send + Sync + 'static {
357            /// Defer completion of a future until a specified `latency` has elapsed. If the future is
358            /// not yet ready at the desired time of completion, the runtime will block until the future
359            /// is ready.
360            ///
361            /// In [crate::deterministic], this is used to ensure interactions with external systems can
362            /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
363            /// multiple runtimes to be tested with no code changes).
364            ///
365            /// # Setting Latency
366            ///
367            /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
368            /// the expected resolution latency of the future. To better explore the possible behavior of an
369            /// application, users can set latency to a randomly chosen value in the range of
370            /// `[expected latency / 2, expected latency * 2]`.
371            ///
372            /// # Warning
373            ///
374            /// Because `pace` blocks if the future is not ready, it is important that the future's completion
375            /// doesn't require anything in the current thread to complete (or else it will deadlock).
376            fn pace<'a, F, T>(
377                &'a self,
378                latency: Duration,
379                future: F,
380            ) -> impl Future<Output = T> + Send + 'a
381            where
382                F: Future<Output = T> + Send + 'a,
383                T: Send + 'a;
384        }
385
386        /// Extension trait that makes it more ergonomic to use [Pacer].
387        ///
388        /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
389        /// runtime should delay completion relative to the clock.
390        pub trait FutureExt: Future + Send + Sized {
391            /// Delay completion of the future until a specified `latency` on `pacer`.
392            fn pace<'a, E>(
393                self,
394                pacer: &'a E,
395                latency: Duration,
396            ) -> impl Future<Output = Self::Output> + Send + 'a
397            where
398                E: Pacer + 'a,
399                Self: Send + 'a,
400                Self::Output: Send + 'a,
401            {
402                pacer.pace(latency, self)
403            }
404        }
405
406        impl<F> FutureExt for F where F: Future + Send {}
407    }
408}
409
410/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
411pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
412
413/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
414pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
415
416/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
417pub type ListenerOf<N> = <N as crate::Network>::Listener;
418
419/// Interface that any runtime must implement to create
420/// network connections.
421pub trait Network: Clone + Send + Sync + 'static {
422    /// The type of [Listener] that's returned when binding to a socket.
423    /// Accepting a connection returns a [Sink] and [Stream] which are defined
424    /// by the [Listener] and used to send and receive data over the connection.
425    type Listener: Listener;
426
427    /// Bind to the given socket address.
428    fn bind(
429        &self,
430        socket: SocketAddr,
431    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
432
433    /// Dial the given socket address.
434    fn dial(
435        &self,
436        socket: SocketAddr,
437    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
438}
439
440/// Interface for DNS resolution.
441pub trait Resolver: Clone + Send + Sync + 'static {
442    /// Resolve a hostname to IP addresses.
443    ///
444    /// Returns a list of IP addresses that the hostname resolves to.
445    fn resolve(
446        &self,
447        host: &str,
448    ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
449}
450
451/// Interface that any runtime must implement to handle
452/// incoming network connections.
453pub trait Listener: Sync + Send + 'static {
454    /// The type of [Sink] that's returned when accepting a connection.
455    /// This is used to send data to the remote connection.
456    type Sink: Sink;
457    /// The type of [Stream] that's returned when accepting a connection.
458    /// This is used to receive data from the remote connection.
459    type Stream: Stream;
460
461    /// Accept an incoming connection.
462    fn accept(
463        &mut self,
464    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
465
466    /// Returns the local address of the listener.
467    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
468}
469
470/// Interface that any runtime must implement to send
471/// messages over a network connection.
472pub trait Sink: Sync + Send + 'static {
473    /// Send a message to the sink.
474    ///
475    /// # Warning
476    ///
477    /// If the sink returns an error, part of the message may still be delivered.
478    fn send(
479        &mut self,
480        msg: impl Into<StableBuf> + Send,
481    ) -> impl Future<Output = Result<(), Error>> + Send;
482}
483
484/// Interface that any runtime must implement to receive
485/// messages over a network connection.
486pub trait Stream: Sync + Send + 'static {
487    /// Receive a message from the stream, storing it in the given buffer.
488    /// Reads exactly the number of bytes that fit in the buffer.
489    ///
490    /// # Warning
491    ///
492    /// If the stream returns an error, partially read data may be discarded.
493    fn recv(
494        &mut self,
495        buf: impl Into<StableBuf> + Send,
496    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
497}
498
499/// Interface to interact with storage.
500///
501/// To support storage implementations that enable concurrent reads and
502/// writes, blobs are responsible for maintaining synchronization.
503///
504/// Storage can be backed by a local filesystem, cloud storage, etc.
505///
506/// # Partition Names
507///
508/// Partition names must be non-empty and contain only ASCII alphanumeric
509/// characters, dashes (`-`), or underscores (`_`). Names containing other
510/// characters (e.g., `/`, `.`, spaces) will return an error.
511pub trait Storage: Clone + Send + Sync + 'static {
512    /// The readable/writeable storage buffer that can be opened by this Storage.
513    type Blob: Blob;
514
515    /// Open an existing blob in a given partition or create a new one, returning
516    /// the blob and its length.
517    ///
518    /// Multiple instances of the same blob can be opened concurrently, however,
519    /// writing to the same blob concurrently may lead to undefined behavior.
520    ///
521    /// An Ok result indicates the blob is durably created (or already exists).
522    fn open(
523        &self,
524        partition: &str,
525        name: &[u8],
526    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
527
528    /// Remove a blob from a given partition.
529    ///
530    /// If no `name` is provided, the entire partition is removed.
531    ///
532    /// An Ok result indicates the blob is durably removed.
533    fn remove(
534        &self,
535        partition: &str,
536        name: Option<&[u8]>,
537    ) -> impl Future<Output = Result<(), Error>> + Send;
538
539    /// Return all blobs in a given partition.
540    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
541}
542
543/// Interface to read and write to a blob.
544///
545/// To support blob implementations that enable concurrent reads and
546/// writes, blobs are responsible for maintaining synchronization.
547///
548/// Cloning a blob is similar to wrapping a single file descriptor in
549/// a lock whereas opening a new blob (of the same name) is similar to
550/// opening a new file descriptor. If multiple blobs are opened with the same
551/// name, they are not expected to coordinate access to underlying storage
552/// and writing to both is undefined behavior.
553///
554/// When a blob is dropped, any unsynced changes may be discarded. Implementations
555/// may attempt to sync during drop but errors will go unhandled. Call `sync`
556/// before dropping to ensure all changes are durably persisted.
557#[allow(clippy::len_without_is_empty)]
558pub trait Blob: Clone + Send + Sync + 'static {
559    /// Read from the blob at the given offset.
560    ///
561    /// `read_at` does not return the number of bytes read because it
562    /// only returns once the entire buffer has been filled.
563    fn read_at(
564        &self,
565        buf: impl Into<StableBuf> + Send,
566        offset: u64,
567    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
568
569    /// Write `buf` to the blob at the given offset.
570    fn write_at(
571        &self,
572        buf: impl Into<StableBuf> + Send,
573        offset: u64,
574    ) -> impl Future<Output = Result<(), Error>> + Send;
575
576    /// Resize the blob to the given length.
577    ///
578    /// If the length is greater than the current length, the blob is extended with zeros.
579    /// If the length is less than the current length, the blob is resized.
580    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
581
582    /// Ensure all pending data is durably persisted.
583    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
584}
585
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use crate::telemetry::traces::collector::TraceStorage;
590    use bytes::Bytes;
591    use commonware_macros::{select, test_collect_traces};
592    use futures::{
593        channel::{mpsc, oneshot},
594        future::{pending, ready},
595        join, pin_mut, FutureExt, SinkExt, StreamExt,
596    };
597    use prometheus_client::metrics::counter::Counter;
598    use std::{
599        collections::HashMap,
600        net::{IpAddr, Ipv4Addr, Ipv6Addr},
601        pin::Pin,
602        str::FromStr,
603        sync::{
604            atomic::{AtomicU32, Ordering},
605            Arc, Mutex,
606        },
607        task::{Context as TContext, Poll, Waker},
608    };
609    use tracing::{error, Level};
610    use utils::reschedule;
611
612    fn test_error_future<R: Runner>(runner: R) {
613        async fn error_future() -> Result<&'static str, &'static str> {
614            Err("An error occurred")
615        }
616        let result = runner.start(|_| error_future());
617        assert_eq!(result, Err("An error occurred"));
618    }
619
620    fn test_clock_sleep<R: Runner>(runner: R)
621    where
622        R::Context: Spawner + Clock,
623    {
624        runner.start(|context| async move {
625            // Capture initial time
626            let start = context.current();
627            let sleep_duration = Duration::from_millis(10);
628            context.sleep(sleep_duration).await;
629
630            // After run, time should have advanced
631            let end = context.current();
632            assert!(end.duration_since(start).unwrap() >= sleep_duration);
633        });
634    }
635
636    fn test_clock_sleep_until<R: Runner>(runner: R)
637    where
638        R::Context: Spawner + Clock + Metrics,
639    {
640        runner.start(|context| async move {
641            // Trigger sleep
642            let now = context.current();
643            context.sleep_until(now + Duration::from_millis(100)).await;
644
645            // Ensure slept duration has elapsed
646            let elapsed = now.elapsed().unwrap();
647            assert!(elapsed >= Duration::from_millis(100));
648        });
649    }
650
651    fn test_clock_timeout<R: Runner>(runner: R)
652    where
653        R::Context: Spawner + Clock,
654    {
655        runner.start(|context| async move {
656            // Future completes before timeout
657            let result = context
658                .timeout(Duration::from_millis(100), async { "success" })
659                .await;
660            assert_eq!(result.unwrap(), "success");
661
662            // Future exceeds timeout duration
663            let result = context
664                .timeout(Duration::from_millis(50), pending::<()>())
665                .await;
666            assert!(matches!(result, Err(Error::Timeout)));
667
668            // Future completes within timeout
669            let result = context
670                .timeout(
671                    Duration::from_millis(100),
672                    context.sleep(Duration::from_millis(50)),
673                )
674                .await;
675            assert!(result.is_ok());
676        });
677    }
678
679    fn test_root_finishes<R: Runner>(runner: R)
680    where
681        R::Context: Spawner,
682    {
683        runner.start(|context| async move {
684            context.spawn(|_| async move {
685                loop {
686                    reschedule().await;
687                }
688            });
689        });
690    }
691
692    fn test_spawn_after_abort<R>(runner: R)
693    where
694        R: Runner,
695        R::Context: Spawner + Clone,
696    {
697        runner.start(|context| async move {
698            // Create a child context
699            let child = context.clone();
700
701            // Spawn parent and abort
702            let parent_handle = context.spawn(move |_| async move {
703                pending::<()>().await;
704            });
705            parent_handle.abort();
706
707            // Spawn child and ensure it aborts
708            let child_handle = child.spawn(move |_| async move {
709                pending::<()>().await;
710            });
711            assert!(matches!(child_handle.await, Err(Error::Closed)));
712        });
713    }
714
715    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
716    where
717        R::Context: Spawner,
718    {
719        runner.start(|context| async move {
720            let context = if dedicated {
721                assert!(!blocking);
722                context.dedicated()
723            } else {
724                context.shared(blocking)
725            };
726
727            let handle = context.spawn(|_| async move {
728                loop {
729                    reschedule().await;
730                }
731            });
732            handle.abort();
733            assert!(matches!(handle.await, Err(Error::Closed)));
734        });
735    }
736
737    fn test_panic_aborts_root<R: Runner>(runner: R) {
738        let result: Result<(), Error> = runner.start(|_| async move {
739            panic!("blah");
740        });
741        result.unwrap_err();
742    }
743
744    fn test_panic_aborts_spawn<R: Runner>(runner: R)
745    where
746        R::Context: Spawner + Clock,
747    {
748        runner.start(|context| async move {
749            context.clone().spawn(|_| async move {
750                panic!("blah");
751            });
752
753            // Loop until panic
754            loop {
755                context.sleep(Duration::from_millis(100)).await;
756            }
757        });
758    }
759
760    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
761    where
762        R::Context: Spawner + Clock,
763    {
764        let result: Result<(), Error> = runner.start(|context| async move {
765            let result = context.clone().spawn(|_| async move {
766                panic!("blah");
767            });
768            result.await
769        });
770        assert!(matches!(result, Err(Error::Exited)));
771    }
772
773    fn test_multiple_panics<R: Runner>(runner: R)
774    where
775        R::Context: Spawner + Clock,
776    {
777        runner.start(|context| async move {
778            context.clone().spawn(|_| async move {
779                panic!("boom 1");
780            });
781            context.clone().spawn(|_| async move {
782                panic!("boom 2");
783            });
784            context.clone().spawn(|_| async move {
785                panic!("boom 3");
786            });
787
788            // Loop until panic
789            loop {
790                context.sleep(Duration::from_millis(100)).await;
791            }
792        });
793    }
794
795    fn test_multiple_panics_caught<R: Runner>(runner: R)
796    where
797        R::Context: Spawner + Clock,
798    {
799        let (res1, res2, res3) = runner.start(|context| async move {
800            let handle1 = context.clone().spawn(|_| async move {
801                panic!("boom 1");
802            });
803            let handle2 = context.clone().spawn(|_| async move {
804                panic!("boom 2");
805            });
806            let handle3 = context.clone().spawn(|_| async move {
807                panic!("boom 3");
808            });
809
810            join!(handle1, handle2, handle3)
811        });
812        assert!(matches!(res1, Err(Error::Exited)));
813        assert!(matches!(res2, Err(Error::Exited)));
814        assert!(matches!(res3, Err(Error::Exited)));
815    }
816
817    fn test_select<R: Runner>(runner: R) {
818        runner.start(|_| async move {
819            // Test first branch
820            let output = Mutex::new(0);
821            select! {
822                v1 = ready(1) => {
823                    *output.lock().unwrap() = v1;
824                },
825                v2 = ready(2) => {
826                    *output.lock().unwrap() = v2;
827                },
828            };
829            assert_eq!(*output.lock().unwrap(), 1);
830
831            // Test second branch
832            select! {
833                v1 = std::future::pending::<i32>() => {
834                    *output.lock().unwrap() = v1;
835                },
836                v2 = ready(2) => {
837                    *output.lock().unwrap() = v2;
838                },
839            };
840            assert_eq!(*output.lock().unwrap(), 2);
841        });
842    }
843
844    /// Ensure future fusing works as expected.
845    fn test_select_loop<R: Runner>(runner: R)
846    where
847        R::Context: Clock,
848    {
849        runner.start(|context| async move {
850            // Should hit timeout
851            let (mut sender, mut receiver) = mpsc::unbounded();
852            for _ in 0..2 {
853                select! {
854                    v = receiver.next() => {
855                        panic!("unexpected value: {v:?}");
856                    },
857                    _ = context.sleep(Duration::from_millis(100)) => {
858                        continue;
859                    },
860                };
861            }
862
863            // Populate channel
864            sender.send(0).await.unwrap();
865            sender.send(1).await.unwrap();
866
867            // Prefer not reading channel without losing messages
868            select! {
869                _ = async {} => {
870                    // Skip reading from channel even though populated
871                },
872                v = receiver.next() => {
873                    panic!("unexpected value: {v:?}");
874                },
875            };
876
877            // Process messages
878            for i in 0..2 {
879                select! {
880                    _ = context.sleep(Duration::from_millis(100)) => {
881                        panic!("timeout");
882                    },
883                    v = receiver.next() => {
884                        assert_eq!(v.unwrap(), i);
885                    },
886                };
887            }
888        });
889    }
890
891    fn test_storage_operations<R: Runner>(runner: R)
892    where
893        R::Context: Storage,
894    {
895        runner.start(|context| async move {
896            let partition = "test_partition";
897            let name = b"test_blob";
898
899            // Open a new blob
900            let (blob, _) = context
901                .open(partition, name)
902                .await
903                .expect("Failed to open blob");
904
905            // Write data to the blob
906            let data = b"Hello, Storage!";
907            blob.write_at(Vec::from(data), 0)
908                .await
909                .expect("Failed to write to blob");
910
911            // Sync the blob
912            blob.sync().await.expect("Failed to sync blob");
913
914            // Read data from the blob
915            let read = blob
916                .read_at(vec![0; data.len()], 0)
917                .await
918                .expect("Failed to read from blob");
919            assert_eq!(read.as_ref(), data);
920
921            // Sync the blob
922            blob.sync().await.expect("Failed to sync blob");
923
924            // Scan blobs in the partition
925            let blobs = context
926                .scan(partition)
927                .await
928                .expect("Failed to scan partition");
929            assert!(blobs.contains(&name.to_vec()));
930
931            // Reopen the blob
932            let (blob, len) = context
933                .open(partition, name)
934                .await
935                .expect("Failed to reopen blob");
936            assert_eq!(len, data.len() as u64);
937
938            // Read data part of message back
939            let read = blob
940                .read_at(vec![0u8; 7], 7)
941                .await
942                .expect("Failed to read data");
943            assert_eq!(read.as_ref(), b"Storage");
944
945            // Sync the blob
946            blob.sync().await.expect("Failed to sync blob");
947
948            // Remove the blob
949            context
950                .remove(partition, Some(name))
951                .await
952                .expect("Failed to remove blob");
953
954            // Ensure the blob is removed
955            let blobs = context
956                .scan(partition)
957                .await
958                .expect("Failed to scan partition");
959            assert!(!blobs.contains(&name.to_vec()));
960
961            // Remove the partition
962            context
963                .remove(partition, None)
964                .await
965                .expect("Failed to remove partition");
966
967            // Scan the partition
968            let result = context.scan(partition).await;
969            assert!(matches!(result, Err(Error::PartitionMissing(_))));
970        });
971    }
972
973    fn test_blob_read_write<R: Runner>(runner: R)
974    where
975        R::Context: Storage,
976    {
977        runner.start(|context| async move {
978            let partition = "test_partition";
979            let name = b"test_blob_rw";
980
981            // Open a new blob
982            let (blob, _) = context
983                .open(partition, name)
984                .await
985                .expect("Failed to open blob");
986
987            // Write data at different offsets
988            let data1 = b"Hello";
989            let data2 = b"World";
990            blob.write_at(Vec::from(data1), 0)
991                .await
992                .expect("Failed to write data1");
993            blob.write_at(Vec::from(data2), 5)
994                .await
995                .expect("Failed to write data2");
996
997            // Read data back
998            let read = blob
999                .read_at(vec![0u8; 10], 0)
1000                .await
1001                .expect("Failed to read data");
1002            assert_eq!(&read.as_ref()[..5], data1);
1003            assert_eq!(&read.as_ref()[5..], data2);
1004
1005            // Read past end of blob
1006            let result = blob.read_at(vec![0u8; 10], 10).await;
1007            assert!(result.is_err());
1008
1009            // Rewrite data without affecting length
1010            let data3 = b"Store";
1011            blob.write_at(Vec::from(data3), 5)
1012                .await
1013                .expect("Failed to write data3");
1014
1015            // Read data back
1016            let read = blob
1017                .read_at(vec![0u8; 10], 0)
1018                .await
1019                .expect("Failed to read data");
1020            assert_eq!(&read.as_ref()[..5], data1);
1021            assert_eq!(&read.as_ref()[5..], data3);
1022
1023            // Read past end of blob
1024            let result = blob.read_at(vec![0u8; 10], 10).await;
1025            assert!(result.is_err());
1026        });
1027    }
1028
1029    fn test_blob_resize<R: Runner>(runner: R)
1030    where
1031        R::Context: Storage,
1032    {
1033        runner.start(|context| async move {
1034            let partition = "test_partition_resize";
1035            let name = b"test_blob_resize";
1036
1037            // Open and write to a new blob
1038            let (blob, _) = context
1039                .open(partition, name)
1040                .await
1041                .expect("Failed to open blob");
1042
1043            let data = b"some data";
1044            blob.write_at(data.to_vec(), 0)
1045                .await
1046                .expect("Failed to write");
1047            blob.sync().await.expect("Failed to sync after write");
1048
1049            // Re-open and check length
1050            let (blob, len) = context.open(partition, name).await.unwrap();
1051            assert_eq!(len, data.len() as u64);
1052
1053            // Resize to extend the file
1054            let new_len = (data.len() as u64) * 2;
1055            blob.resize(new_len)
1056                .await
1057                .expect("Failed to resize to extend");
1058            blob.sync().await.expect("Failed to sync after resize");
1059
1060            // Re-open and check length again
1061            let (blob, len) = context.open(partition, name).await.unwrap();
1062            assert_eq!(len, new_len);
1063
1064            // Read original data
1065            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1066            assert_eq!(read_buf.as_ref(), data);
1067
1068            // Read extended part (should be zeros)
1069            let extended_part = blob
1070                .read_at(vec![0; data.len()], data.len() as u64)
1071                .await
1072                .unwrap();
1073            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1074
1075            // Truncate the blob
1076            blob.resize(data.len() as u64).await.unwrap();
1077            blob.sync().await.unwrap();
1078
1079            // Reopen to check truncation
1080            let (blob, size) = context.open(partition, name).await.unwrap();
1081            assert_eq!(size, data.len() as u64);
1082
1083            // Read truncated data
1084            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1085            assert_eq!(read_buf.as_ref(), data);
1086            blob.sync().await.unwrap();
1087        });
1088    }
1089
1090    fn test_many_partition_read_write<R: Runner>(runner: R)
1091    where
1092        R::Context: Storage,
1093    {
1094        runner.start(|context| async move {
1095            let partitions = ["partition1", "partition2", "partition3"];
1096            let name = b"test_blob_rw";
1097            let data1 = b"Hello";
1098            let data2 = b"World";
1099
1100            for (additional, partition) in partitions.iter().enumerate() {
1101                // Open a new blob
1102                let (blob, _) = context
1103                    .open(partition, name)
1104                    .await
1105                    .expect("Failed to open blob");
1106
1107                // Write data at different offsets
1108                blob.write_at(Vec::from(data1), 0)
1109                    .await
1110                    .expect("Failed to write data1");
1111                blob.write_at(Vec::from(data2), 5 + additional as u64)
1112                    .await
1113                    .expect("Failed to write data2");
1114
1115                // Sync the blob
1116                blob.sync().await.expect("Failed to sync blob");
1117            }
1118
1119            for (additional, partition) in partitions.iter().enumerate() {
1120                // Open a new blob
1121                let (blob, len) = context
1122                    .open(partition, name)
1123                    .await
1124                    .expect("Failed to open blob");
1125                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1126
1127                // Read data back
1128                let read = blob
1129                    .read_at(vec![0u8; 10 + additional], 0)
1130                    .await
1131                    .expect("Failed to read data");
1132                assert_eq!(&read.as_ref()[..5], b"Hello");
1133                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1134            }
1135        });
1136    }
1137
1138    fn test_blob_read_past_length<R: Runner>(runner: R)
1139    where
1140        R::Context: Storage,
1141    {
1142        runner.start(|context| async move {
1143            let partition = "test_partition";
1144            let name = b"test_blob_rw";
1145
1146            // Open a new blob
1147            let (blob, _) = context
1148                .open(partition, name)
1149                .await
1150                .expect("Failed to open blob");
1151
1152            // Read data past file length (empty file)
1153            let result = blob.read_at(vec![0u8; 10], 0).await;
1154            assert!(result.is_err());
1155
1156            // Write data to the blob
1157            let data = b"Hello, Storage!".to_vec();
1158            blob.write_at(data, 0)
1159                .await
1160                .expect("Failed to write to blob");
1161
1162            // Read data past file length (non-empty file)
1163            let result = blob.read_at(vec![0u8; 20], 0).await;
1164            assert!(result.is_err());
1165        })
1166    }
1167
1168    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1169    where
1170        R::Context: Spawner + Storage + Metrics,
1171    {
1172        runner.start(|context| async move {
1173            let partition = "test_partition";
1174            let name = b"test_blob_rw";
1175
1176            // Open a new blob
1177            let (blob, _) = context
1178                .open(partition, name)
1179                .await
1180                .expect("Failed to open blob");
1181
1182            // Write data to the blob
1183            let data = b"Hello, Storage!";
1184            blob.write_at(Vec::from(data), 0)
1185                .await
1186                .expect("Failed to write to blob");
1187
1188            // Sync the blob
1189            blob.sync().await.expect("Failed to sync blob");
1190
1191            // Read data from the blob in clone
1192            let check1 = context.with_label("check1").spawn({
1193                let blob = blob.clone();
1194                move |_| async move {
1195                    let read = blob
1196                        .read_at(vec![0u8; data.len()], 0)
1197                        .await
1198                        .expect("Failed to read from blob");
1199                    assert_eq!(read.as_ref(), data);
1200                }
1201            });
1202            let check2 = context.with_label("check2").spawn({
1203                let blob = blob.clone();
1204                move |_| async move {
1205                    let read = blob
1206                        .read_at(vec![0; data.len()], 0)
1207                        .await
1208                        .expect("Failed to read from blob");
1209                    assert_eq!(read.as_ref(), data);
1210                }
1211            });
1212
1213            // Wait for both reads to complete
1214            let result = join!(check1, check2);
1215            assert!(result.0.is_ok());
1216            assert!(result.1.is_ok());
1217
1218            // Read data from the blob
1219            let read = blob
1220                .read_at(vec![0; data.len()], 0)
1221                .await
1222                .expect("Failed to read from blob");
1223            assert_eq!(read.as_ref(), data);
1224
1225            // Drop the blob
1226            drop(blob);
1227
1228            // Ensure no blobs still open
1229            let buffer = context.encode();
1230            assert!(buffer.contains("open_blobs 0"));
1231        });
1232    }
1233
1234    fn test_shutdown<R: Runner>(runner: R)
1235    where
1236        R::Context: Spawner + Metrics + Clock,
1237    {
1238        let kill = 9;
1239        runner.start(|context| async move {
1240            // Spawn a task that waits for signal
1241            let before = context
1242                .with_label("before")
1243                .spawn(move |context| async move {
1244                    let mut signal = context.stopped();
1245                    let value = (&mut signal).await.unwrap();
1246                    assert_eq!(value, kill);
1247                    drop(signal);
1248                });
1249
1250            // Signal the tasks and wait for them to stop
1251            let result = context.clone().stop(kill, None).await;
1252            assert!(result.is_ok());
1253
1254            // Spawn a task after stop is called
1255            let after = context
1256                .with_label("after")
1257                .spawn(move |context| async move {
1258                    // A call to `stopped()` after `stop()` resolves immediately
1259                    let value = context.stopped().await.unwrap();
1260                    assert_eq!(value, kill);
1261                });
1262
1263            // Ensure both tasks complete
1264            let result = join!(before, after);
1265            assert!(result.0.is_ok());
1266            assert!(result.1.is_ok());
1267        });
1268    }
1269
1270    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1271    where
1272        R::Context: Spawner + Metrics + Clock,
1273    {
1274        let kill = 42;
1275        runner.start(|context| async move {
1276            let (started_tx, mut started_rx) = mpsc::channel(3);
1277            let counter = Arc::new(AtomicU32::new(0));
1278
1279            // Spawn 3 tasks that do cleanup work after receiving stop signal
1280            // and increment a shared counter
1281            let task = |cleanup_duration: Duration| {
1282                let context = context.clone();
1283                let counter = counter.clone();
1284                let mut started_tx = started_tx.clone();
1285                context.spawn(move |context| async move {
1286                    // Wait for signal to be acquired
1287                    let mut signal = context.stopped();
1288                    started_tx.send(()).await.unwrap();
1289
1290                    // Increment once killed
1291                    let value = (&mut signal).await.unwrap();
1292                    assert_eq!(value, kill);
1293                    context.sleep(cleanup_duration).await;
1294                    counter.fetch_add(1, Ordering::SeqCst);
1295
1296                    // Wait to drop signal until work has been done
1297                    drop(signal);
1298                })
1299            };
1300
1301            let task1 = task(Duration::from_millis(10));
1302            let task2 = task(Duration::from_millis(20));
1303            let task3 = task(Duration::from_millis(30));
1304
1305            // Give tasks time to start
1306            for _ in 0..3 {
1307                started_rx.next().await.unwrap();
1308            }
1309
1310            // Stop and verify all cleanup completed
1311            context.stop(kill, None).await.unwrap();
1312            assert_eq!(counter.load(Ordering::SeqCst), 3);
1313
1314            // Ensure all tasks completed
1315            let result = join!(task1, task2, task3);
1316            assert!(result.0.is_ok());
1317            assert!(result.1.is_ok());
1318            assert!(result.2.is_ok());
1319        });
1320    }
1321
1322    fn test_shutdown_timeout<R: Runner>(runner: R)
1323    where
1324        R::Context: Spawner + Metrics + Clock,
1325    {
1326        let kill = 42;
1327        runner.start(|context| async move {
1328            // Setup startup coordinator
1329            let (started_tx, started_rx) = oneshot::channel();
1330
1331            // Spawn a task that never completes its cleanup
1332            context.clone().spawn(move |context| async move {
1333                let signal = context.stopped();
1334                started_tx.send(()).unwrap();
1335                pending::<()>().await;
1336                signal.await.unwrap();
1337            });
1338
1339            // Try to stop with a timeout
1340            started_rx.await.unwrap();
1341            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1342
1343            // Assert that we got a timeout error
1344            assert!(matches!(result, Err(Error::Timeout)));
1345        });
1346    }
1347
1348    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1349    where
1350        R::Context: Spawner + Metrics + Clock,
1351    {
1352        let kill1 = 42;
1353        let kill2 = 43;
1354
1355        runner.start(|context| async move {
1356            let (started_tx, started_rx) = oneshot::channel();
1357            let counter = Arc::new(AtomicU32::new(0));
1358
1359            // Spawn a task that delays completion to test timing
1360            let task = context.with_label("blocking_task").spawn({
1361                let counter = counter.clone();
1362                move |context| async move {
1363                    // Wait for signal to be acquired
1364                    let mut signal = context.stopped();
1365                    started_tx.send(()).unwrap();
1366
1367                    // Wait for signal to be resolved
1368                    let value = (&mut signal).await.unwrap();
1369                    assert_eq!(value, kill1);
1370                    context.sleep(Duration::from_millis(50)).await;
1371
1372                    // Increment counter
1373                    counter.fetch_add(1, Ordering::SeqCst);
1374                    drop(signal);
1375                }
1376            });
1377
1378            // Give task time to start
1379            started_rx.await.unwrap();
1380
1381            // Issue two separate stop calls
1382            // The second stop call uses a different stop value that should be ignored
1383            let stop_task1 = context.clone().stop(kill1, None);
1384            pin_mut!(stop_task1);
1385            let stop_task2 = context.clone().stop(kill2, None);
1386            pin_mut!(stop_task2);
1387
1388            // Both of them should be awaiting completion
1389            assert!(stop_task1.as_mut().now_or_never().is_none());
1390            assert!(stop_task2.as_mut().now_or_never().is_none());
1391
1392            // Wait for both stop calls to complete
1393            assert!(stop_task1.await.is_ok());
1394            assert!(stop_task2.await.is_ok());
1395
1396            // Verify first stop value wins
1397            let sig = context.stopped().await;
1398            assert_eq!(sig.unwrap(), kill1);
1399
1400            // Wait for blocking task to complete
1401            let result = task.await;
1402            assert!(result.is_ok());
1403            assert_eq!(counter.load(Ordering::SeqCst), 1);
1404
1405            // Post-completion stop should return immediately
1406            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1407        });
1408    }
1409
1410    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1411    where
1412        R::Context: Spawner + Metrics,
1413    {
1414        runner.start(|context| async move {
1415            // Spawn a task that waits for signal
1416            context
1417                .with_label("before")
1418                .spawn(move |context| async move {
1419                    let mut signal = context.stopped();
1420                    let value = (&mut signal).await.unwrap();
1421
1422                    // We should never reach this point
1423                    assert_eq!(value, 42);
1424                    drop(signal);
1425                });
1426
1427            // Ensure waker is registered
1428            reschedule().await;
1429        });
1430    }
1431
1432    fn test_spawn_dedicated<R: Runner>(runner: R)
1433    where
1434        R::Context: Spawner,
1435    {
1436        runner.start(|context| async move {
1437            let handle = context.dedicated().spawn(|_| async move { 42 });
1438            assert!(matches!(handle.await, Ok(42)));
1439        });
1440    }
1441
1442    fn test_spawn<R: Runner>(runner: R)
1443    where
1444        R::Context: Spawner + Clock,
1445    {
1446        runner.start(|context| async move {
1447            let child_handle = Arc::new(Mutex::new(None));
1448            let child_handle2 = child_handle.clone();
1449
1450            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1451            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1452            let parent_handle = context.spawn(move |context| async move {
1453                // Spawn child that completes immediately
1454                let handle = context.spawn(|_| async {});
1455
1456                // Store child handle so we can test it later
1457                *child_handle2.lock().unwrap() = Some(handle);
1458
1459                parent_initialized_tx.send(()).unwrap();
1460
1461                // Parent task completes
1462                parent_complete_rx.await.unwrap();
1463            });
1464
1465            // Wait for parent task to spawn the children
1466            parent_initialized_rx.await.unwrap();
1467
1468            // Child task completes successfully
1469            let child_handle = child_handle.lock().unwrap().take().unwrap();
1470            assert!(child_handle.await.is_ok());
1471
1472            // Complete the parent task
1473            parent_complete_tx.send(()).unwrap();
1474
1475            // Wait for parent task to complete successfully
1476            assert!(parent_handle.await.is_ok());
1477        });
1478    }
1479
1480    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1481    where
1482        R::Context: Spawner + Clock,
1483    {
1484        runner.start(|context| async move {
1485            let child_handle = Arc::new(Mutex::new(None));
1486            let child_handle2 = child_handle.clone();
1487
1488            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1489            let parent_handle = context.spawn(move |context| async move {
1490                // Spawn child task that hangs forever, should be aborted when parent aborts
1491                let handle = context.spawn(|_| pending::<()>());
1492
1493                // Store child task handle so we can test it later
1494                *child_handle2.lock().unwrap() = Some(handle);
1495
1496                parent_initialized_tx.send(()).unwrap();
1497
1498                // Parent task runs until aborted
1499                pending::<()>().await
1500            });
1501
1502            // Wait for parent task to spawn the children
1503            parent_initialized_rx.await.unwrap();
1504
1505            // Abort parent task
1506            parent_handle.abort();
1507            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1508
1509            // Child task should also resolve with error since its parent aborted
1510            let child_handle = child_handle.lock().unwrap().take().unwrap();
1511            assert!(matches!(child_handle.await, Err(Error::Closed)));
1512        });
1513    }
1514
1515    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1516    where
1517        R::Context: Spawner + Clock,
1518    {
1519        runner.start(|context| async move {
1520            let child_handle = Arc::new(Mutex::new(None));
1521            let child_handle2 = child_handle.clone();
1522
1523            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1524            let parent_handle = context.spawn(move |context| async move {
1525                // Spawn child task that hangs forever, should be aborted when parent completes
1526                let handle = context.spawn(|_| pending::<()>());
1527
1528                // Store child task handle so we can test it later
1529                *child_handle2.lock().unwrap() = Some(handle);
1530
1531                // Parent task completes
1532                parent_complete_rx.await.unwrap();
1533            });
1534
1535            // Fire parent completion
1536            parent_complete_tx.send(()).unwrap();
1537
1538            // Wait for parent task to complete
1539            assert!(parent_handle.await.is_ok());
1540
1541            // Child task should resolve with error since its parent has completed
1542            let child_handle = child_handle.lock().unwrap().take().unwrap();
1543            assert!(matches!(child_handle.await, Err(Error::Closed)));
1544        });
1545    }
1546
1547    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1548    where
1549        R::Context: Spawner + Clock,
1550    {
1551        runner.start(|context| async move {
1552            // We create the following tree of tasks. All tasks will run
1553            // indefinitely (until aborted).
1554            //
1555            //          root
1556            //     /     |     \
1557            //    /      |      \
1558            //   c0      c1      c2
1559            //  /  \    /  \    /  \
1560            // g0  g1  g2  g3  g4  g5
1561            let c0 = context.clone();
1562            let g0 = c0.clone();
1563            let g1 = c0.clone();
1564            let c1 = context.clone();
1565            let g2 = c1.clone();
1566            let g3 = c1.clone();
1567            let c2 = context.clone();
1568            let g4 = c2.clone();
1569            let g5 = c2.clone();
1570
1571            // Spawn tasks
1572            let handles = Arc::new(Mutex::new(Vec::new()));
1573            let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1574            let root_task = context.spawn({
1575                let handles = handles.clone();
1576                move |_| async move {
1577                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1578                    {
1579                        let handle = context.spawn({
1580                            let handles = handles.clone();
1581                            let mut initialized_tx = initialized_tx.clone();
1582                            move |_| async move {
1583                                for grandchild in grandchildren {
1584                                    let handle = grandchild.spawn(|_| async {
1585                                        pending::<()>().await;
1586                                    });
1587                                    handles.lock().unwrap().push(handle);
1588                                    initialized_tx.send(()).await.unwrap();
1589                                }
1590
1591                                pending::<()>().await;
1592                            }
1593                        });
1594                        handles.lock().unwrap().push(handle);
1595                        initialized_tx.send(()).await.unwrap();
1596                    }
1597
1598                    pending::<()>().await;
1599                }
1600            });
1601
1602            // Wait for tasks to initialize
1603            for _ in 0..9 {
1604                initialized_rx.next().await.unwrap();
1605            }
1606
1607            // Verify we have all 9 handles (3 children + 6 grandchildren)
1608            assert_eq!(handles.lock().unwrap().len(), 9);
1609
1610            // Abort root task
1611            root_task.abort();
1612            assert!(matches!(root_task.await, Err(Error::Closed)));
1613
1614            // All handles should resolve with error due to cascading abort
1615            let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1616            for handle in handles {
1617                assert!(matches!(handle.await, Err(Error::Closed)));
1618            }
1619        });
1620    }
1621
1622    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1623    where
1624        R::Context: Spawner + Clock,
1625    {
1626        runner.start(|context| async move {
1627            let (child_started_tx, child_started_rx) = oneshot::channel();
1628            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1629            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1630            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1631            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1632            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1633            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1634
1635            let parent = context.spawn(move |context| async move {
1636                // Spawn a child task
1637                let child_handle = context.clone().spawn(|_| async move {
1638                    child_started_tx.send(()).unwrap();
1639                    // Wait for signal to complete
1640                    child_complete_rx.await.unwrap();
1641                });
1642                assert!(
1643                    child_handle_tx.send(child_handle).is_ok(),
1644                    "child handle receiver dropped"
1645                );
1646
1647                // Spawn an independent sibling task
1648                let sibling_handle = context.clone().spawn(move |_| async move {
1649                    sibling_started_tx.send(()).unwrap();
1650                    // Wait for signal to complete
1651                    sibling_complete_rx.await.unwrap();
1652                });
1653                assert!(
1654                    sibling_handle_tx.send(sibling_handle).is_ok(),
1655                    "sibling handle receiver dropped"
1656                );
1657
1658                // Wait for signal to complete
1659                parent_complete_rx.await.unwrap();
1660            });
1661
1662            // Wait for both to start
1663            child_started_rx.await.unwrap();
1664            sibling_started_rx.await.unwrap();
1665
1666            // Kill the sibling
1667            sibling_complete_tx.send(()).unwrap();
1668            assert!(sibling_handle_rx.await.is_ok());
1669
1670            // The child task should still be alive
1671            child_complete_tx.send(()).unwrap();
1672            assert!(child_handle_rx.await.is_ok());
1673
1674            // As well as the parent
1675            parent_complete_tx.send(()).unwrap();
1676            assert!(parent.await.is_ok());
1677        });
1678    }
1679
1680    fn test_spawn_clone_chain<R: Runner>(runner: R)
1681    where
1682        R::Context: Spawner + Clock,
1683    {
1684        runner.start(|context| async move {
1685            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1686            let (child_started_tx, child_started_rx) = oneshot::channel();
1687            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1688            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1689            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1690
1691            let parent = context.clone().spawn({
1692                move |context| async move {
1693                    let child = context.clone().spawn({
1694                        move |context| async move {
1695                            let grandchild = context.clone().spawn({
1696                                move |_| async move {
1697                                    grandchild_started_tx.send(()).unwrap();
1698                                    pending::<()>().await;
1699                                }
1700                            });
1701                            assert!(
1702                                grandchild_handle_tx.send(grandchild).is_ok(),
1703                                "grandchild handle receiver dropped"
1704                            );
1705                            child_started_tx.send(()).unwrap();
1706                            pending::<()>().await;
1707                        }
1708                    });
1709                    assert!(
1710                        child_handle_tx.send(child).is_ok(),
1711                        "child handle receiver dropped"
1712                    );
1713                    parent_started_tx.send(()).unwrap();
1714                    pending::<()>().await;
1715                }
1716            });
1717
1718            parent_started_rx.await.unwrap();
1719            child_started_rx.await.unwrap();
1720            grandchild_started_rx.await.unwrap();
1721
1722            let child_handle = child_handle_rx.await.unwrap();
1723            let grandchild_handle = grandchild_handle_rx.await.unwrap();
1724
1725            parent.abort();
1726            assert!(parent.await.is_err());
1727
1728            assert!(child_handle.await.is_err());
1729            assert!(grandchild_handle.await.is_err());
1730        });
1731    }
1732
1733    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1734    where
1735        R::Context: Spawner + Clock,
1736    {
1737        runner.start(|context| async move {
1738            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1739            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1740
1741            let parent = context.clone().spawn({
1742                move |context| async move {
1743                    let clone1 = context.clone();
1744                    let clone2 = clone1.clone();
1745                    let clone3 = clone2.clone();
1746
1747                    let leaf = clone3.clone().spawn({
1748                        move |_| async move {
1749                            leaf_started_tx.send(()).unwrap();
1750                            pending::<()>().await;
1751                        }
1752                    });
1753
1754                    leaf_handle_tx
1755                        .send(leaf)
1756                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1757                    pending::<()>().await;
1758                }
1759            });
1760
1761            leaf_started_rx.await.unwrap();
1762            let leaf_handle = leaf_handle_rx.await.unwrap();
1763
1764            parent.abort();
1765            assert!(parent.await.is_err());
1766            assert!(leaf_handle.await.is_err());
1767        });
1768    }
1769
1770    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1771    where
1772        R::Context: Spawner,
1773    {
1774        runner.start(|context| async move {
1775            let context = if dedicated {
1776                context.dedicated()
1777            } else {
1778                context.shared(true)
1779            };
1780
1781            let handle = context.spawn(|_| async move { 42 });
1782            let result = handle.await;
1783            assert!(matches!(result, Ok(42)));
1784        });
1785    }
1786
1787    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1788    where
1789        R::Context: Spawner + Clock,
1790    {
1791        runner.start(|context| async move {
1792            let context = if dedicated {
1793                context.dedicated()
1794            } else {
1795                context.shared(true)
1796            };
1797
1798            context.clone().spawn(|_| async move {
1799                panic!("blocking task panicked");
1800            });
1801
1802            // Loop until panic
1803            loop {
1804                context.sleep(Duration::from_millis(100)).await;
1805            }
1806        });
1807    }
1808
1809    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1810    where
1811        R::Context: Spawner + Clock,
1812    {
1813        let result: Result<(), Error> = runner.start(|context| async move {
1814            let context = if dedicated {
1815                context.dedicated()
1816            } else {
1817                context.shared(true)
1818            };
1819
1820            let handle = context.clone().spawn(|_| async move {
1821                panic!("blocking task panicked");
1822            });
1823            handle.await
1824        });
1825        assert!(matches!(result, Err(Error::Exited)));
1826    }
1827
1828    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1829        runner.start(|_| async move {
1830            // Setup tracked resource
1831            let dropper = Arc::new(());
1832            let executor = deterministic::Runner::default();
1833            executor.start({
1834                let dropper = dropper.clone();
1835                move |context| async move {
1836                    // Create tasks with circular dependencies through channels
1837                    let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1838                    let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1839                    let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1840
1841                    // Task 1 holds tx2 and waits on rx1
1842                    context.with_label("task1").spawn({
1843                        let mut setup_tx = setup_tx.clone();
1844                        let dropper = dropper.clone();
1845                        move |_| async move {
1846                            // Setup deadlock and mark ready
1847                            tx2.send(()).await.unwrap();
1848                            rx1.next().await.unwrap();
1849                            setup_tx.send(()).await.unwrap();
1850
1851                            // Wait forever
1852                            while rx1.next().await.is_some() {}
1853                            drop(tx2);
1854                            drop(dropper);
1855                        }
1856                    });
1857
1858                    // Task 2 holds tx1 and waits on rx2
1859                    context.with_label("task2").spawn(move |_| async move {
1860                        // Setup deadlock and mark ready
1861                        tx1.send(()).await.unwrap();
1862                        rx2.next().await.unwrap();
1863                        setup_tx.send(()).await.unwrap();
1864
1865                        // Wait forever
1866                        while rx2.next().await.is_some() {}
1867                        drop(tx1);
1868                        drop(dropper);
1869                    });
1870
1871                    // Wait for tasks to start
1872                    setup_rx.next().await.unwrap();
1873                    setup_rx.next().await.unwrap();
1874                }
1875            });
1876
1877            // After runtime drop, both tasks should be cleaned up
1878            Arc::try_unwrap(dropper).expect("references remaining");
1879        });
1880    }
1881
1882    fn test_late_waker<R: Runner>(runner: R)
1883    where
1884        R::Context: Metrics + Spawner,
1885    {
1886        // A future that captures its waker and sends it to the caller, then
1887        // stays pending forever.
1888        struct CaptureWaker {
1889            tx: Option<oneshot::Sender<Waker>>,
1890            sent: bool,
1891        }
1892        impl Future for CaptureWaker {
1893            type Output = ();
1894            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1895                if !self.sent {
1896                    if let Some(tx) = self.tx.take() {
1897                        // Send a clone of the current task's waker to the root
1898                        let _ = tx.send(cx.waker().clone());
1899                    }
1900                    self.sent = true;
1901                }
1902                Poll::Pending
1903            }
1904        }
1905
1906        // A guard that wakes the captured waker on drop.
1907        struct WakeOnDrop(Option<Waker>);
1908        impl Drop for WakeOnDrop {
1909            fn drop(&mut self) {
1910                if let Some(w) = self.0.take() {
1911                    w.wake_by_ref();
1912                }
1913            }
1914        }
1915
1916        // Run the executor to completion
1917        let holder = runner.start(|context| async move {
1918            // Wire a oneshot to receive the task waker.
1919            let (tx, rx) = oneshot::channel::<Waker>();
1920
1921            // Spawn a task that registers its waker and then stays pending.
1922            context
1923                .with_label("capture_waker")
1924                .spawn(move |_| async move {
1925                    CaptureWaker {
1926                        tx: Some(tx),
1927                        sent: false,
1928                    }
1929                    .await;
1930                });
1931
1932            // Ensure the spawned task runs and registers its waker.
1933            utils::reschedule().await;
1934
1935            // Receive the waker from the spawned task.
1936            let waker = rx.await.expect("waker not received");
1937
1938            // Return a guard that will wake after the runtime has dropped.
1939            WakeOnDrop(Some(waker))
1940        });
1941
1942        // Dropping the guard after the runtime has torn down will trigger a wake on
1943        // a task whose executor has been dropped.
1944        drop(holder);
1945    }
1946
1947    fn test_metrics<R: Runner>(runner: R)
1948    where
1949        R::Context: Metrics,
1950    {
1951        runner.start(|context| async move {
1952            // Assert label
1953            assert_eq!(context.label(), "");
1954
1955            // Register a metric
1956            let counter = Counter::<u64>::default();
1957            context.register("test", "test", counter.clone());
1958
1959            // Increment the counter
1960            counter.inc();
1961
1962            // Encode metrics
1963            let buffer = context.encode();
1964            assert!(buffer.contains("test_total 1"));
1965
1966            // Nested context
1967            let context = context.with_label("nested");
1968            let nested_counter = Counter::<u64>::default();
1969            context.register("test", "test", nested_counter.clone());
1970
1971            // Increment the counter
1972            nested_counter.inc();
1973
1974            // Encode metrics
1975            let buffer = context.encode();
1976            assert!(buffer.contains("nested_test_total 1"));
1977            assert!(buffer.contains("test_total 1"));
1978        });
1979    }
1980
1981    fn test_metrics_label<R: Runner>(runner: R)
1982    where
1983        R::Context: Metrics,
1984    {
1985        runner.start(|context| async move {
1986            context.with_label(METRICS_PREFIX);
1987        })
1988    }
1989
1990    fn test_metrics_label_empty<R: Runner>(runner: R)
1991    where
1992        R::Context: Metrics,
1993    {
1994        runner.start(|context| async move {
1995            context.with_label("");
1996        })
1997    }
1998
1999    fn test_metrics_label_invalid_first_char<R: Runner>(runner: R)
2000    where
2001        R::Context: Metrics,
2002    {
2003        runner.start(|context| async move {
2004            context.with_label("1invalid");
2005        })
2006    }
2007
2008    fn test_metrics_label_invalid_char<R: Runner>(runner: R)
2009    where
2010        R::Context: Metrics,
2011    {
2012        runner.start(|context| async move {
2013            context.with_label("invalid-label");
2014        })
2015    }
2016
2017    #[test]
2018    fn test_deterministic_future() {
2019        let runner = deterministic::Runner::default();
2020        test_error_future(runner);
2021    }
2022
2023    #[test]
2024    fn test_deterministic_clock_sleep() {
2025        let executor = deterministic::Runner::default();
2026        test_clock_sleep(executor);
2027    }
2028
2029    #[test]
2030    fn test_deterministic_clock_sleep_until() {
2031        let executor = deterministic::Runner::default();
2032        test_clock_sleep_until(executor);
2033    }
2034
2035    #[test]
2036    fn test_deterministic_clock_timeout() {
2037        let executor = deterministic::Runner::default();
2038        test_clock_timeout(executor);
2039    }
2040
2041    #[test]
2042    fn test_deterministic_root_finishes() {
2043        let executor = deterministic::Runner::default();
2044        test_root_finishes(executor);
2045    }
2046
2047    #[test]
2048    fn test_deterministic_spawn_after_abort() {
2049        let executor = deterministic::Runner::default();
2050        test_spawn_after_abort(executor);
2051    }
2052
2053    #[test]
2054    fn test_deterministic_spawn_abort() {
2055        let executor = deterministic::Runner::default();
2056        test_spawn_abort(executor, false, false);
2057    }
2058
2059    #[test]
2060    #[should_panic(expected = "blah")]
2061    fn test_deterministic_panic_aborts_root() {
2062        let runner = deterministic::Runner::default();
2063        test_panic_aborts_root(runner);
2064    }
2065
2066    #[test]
2067    #[should_panic(expected = "blah")]
2068    fn test_deterministic_panic_aborts_root_caught() {
2069        let cfg = deterministic::Config::default().with_catch_panics(true);
2070        let runner = deterministic::Runner::new(cfg);
2071        test_panic_aborts_root(runner);
2072    }
2073
2074    #[test]
2075    #[should_panic(expected = "blah")]
2076    fn test_deterministic_panic_aborts_spawn() {
2077        let executor = deterministic::Runner::default();
2078        test_panic_aborts_spawn(executor);
2079    }
2080
2081    #[test]
2082    fn test_deterministic_panic_aborts_spawn_caught() {
2083        let cfg = deterministic::Config::default().with_catch_panics(true);
2084        let executor = deterministic::Runner::new(cfg);
2085        test_panic_aborts_spawn_caught(executor);
2086    }
2087
2088    #[test]
2089    #[should_panic(expected = "boom")]
2090    fn test_deterministic_multiple_panics() {
2091        let executor = deterministic::Runner::default();
2092        test_multiple_panics(executor);
2093    }
2094
2095    #[test]
2096    fn test_deterministic_multiple_panics_caught() {
2097        let cfg = deterministic::Config::default().with_catch_panics(true);
2098        let executor = deterministic::Runner::new(cfg);
2099        test_multiple_panics_caught(executor);
2100    }
2101
2102    #[test]
2103    fn test_deterministic_select() {
2104        let executor = deterministic::Runner::default();
2105        test_select(executor);
2106    }
2107
2108    #[test]
2109    fn test_deterministic_select_loop() {
2110        let executor = deterministic::Runner::default();
2111        test_select_loop(executor);
2112    }
2113
2114    #[test]
2115    fn test_deterministic_storage_operations() {
2116        let executor = deterministic::Runner::default();
2117        test_storage_operations(executor);
2118    }
2119
2120    #[test]
2121    fn test_deterministic_blob_read_write() {
2122        let executor = deterministic::Runner::default();
2123        test_blob_read_write(executor);
2124    }
2125
2126    #[test]
2127    fn test_deterministic_blob_resize() {
2128        let executor = deterministic::Runner::default();
2129        test_blob_resize(executor);
2130    }
2131
2132    #[test]
2133    fn test_deterministic_many_partition_read_write() {
2134        let executor = deterministic::Runner::default();
2135        test_many_partition_read_write(executor);
2136    }
2137
2138    #[test]
2139    fn test_deterministic_blob_read_past_length() {
2140        let executor = deterministic::Runner::default();
2141        test_blob_read_past_length(executor);
2142    }
2143
2144    #[test]
2145    fn test_deterministic_blob_clone_and_concurrent_read() {
2146        // Run test
2147        let executor = deterministic::Runner::default();
2148        test_blob_clone_and_concurrent_read(executor);
2149    }
2150
2151    #[test]
2152    fn test_deterministic_shutdown() {
2153        let executor = deterministic::Runner::default();
2154        test_shutdown(executor);
2155    }
2156
2157    #[test]
2158    fn test_deterministic_shutdown_multiple_signals() {
2159        let executor = deterministic::Runner::default();
2160        test_shutdown_multiple_signals(executor);
2161    }
2162
2163    #[test]
2164    fn test_deterministic_shutdown_timeout() {
2165        let executor = deterministic::Runner::default();
2166        test_shutdown_timeout(executor);
2167    }
2168
2169    #[test]
2170    fn test_deterministic_shutdown_multiple_stop_calls() {
2171        let executor = deterministic::Runner::default();
2172        test_shutdown_multiple_stop_calls(executor);
2173    }
2174
2175    #[test]
2176    fn test_deterministic_unfulfilled_shutdown() {
2177        let executor = deterministic::Runner::default();
2178        test_unfulfilled_shutdown(executor);
2179    }
2180
2181    #[test]
2182    fn test_deterministic_spawn_dedicated() {
2183        let executor = deterministic::Runner::default();
2184        test_spawn_dedicated(executor);
2185    }
2186
2187    #[test]
2188    fn test_deterministic_spawn() {
2189        let runner = deterministic::Runner::default();
2190        test_spawn(runner);
2191    }
2192
2193    #[test]
2194    fn test_deterministic_spawn_abort_on_parent_abort() {
2195        let runner = deterministic::Runner::default();
2196        test_spawn_abort_on_parent_abort(runner);
2197    }
2198
2199    #[test]
2200    fn test_deterministic_spawn_abort_on_parent_completion() {
2201        let runner = deterministic::Runner::default();
2202        test_spawn_abort_on_parent_completion(runner);
2203    }
2204
2205    #[test]
2206    fn test_deterministic_spawn_cascading_abort() {
2207        let runner = deterministic::Runner::default();
2208        test_spawn_cascading_abort(runner);
2209    }
2210
2211    #[test]
2212    fn test_deterministic_child_survives_sibling_completion() {
2213        let runner = deterministic::Runner::default();
2214        test_child_survives_sibling_completion(runner);
2215    }
2216
2217    #[test]
2218    fn test_deterministic_spawn_clone_chain() {
2219        let runner = deterministic::Runner::default();
2220        test_spawn_clone_chain(runner);
2221    }
2222
2223    #[test]
2224    fn test_deterministic_spawn_sparse_clone_chain() {
2225        let runner = deterministic::Runner::default();
2226        test_spawn_sparse_clone_chain(runner);
2227    }
2228
2229    #[test]
2230    fn test_deterministic_spawn_blocking() {
2231        for dedicated in [false, true] {
2232            let executor = deterministic::Runner::default();
2233            test_spawn_blocking(executor, dedicated);
2234        }
2235    }
2236
2237    #[test]
2238    #[should_panic(expected = "blocking task panicked")]
2239    fn test_deterministic_spawn_blocking_panic() {
2240        for dedicated in [false, true] {
2241            let executor = deterministic::Runner::default();
2242            test_spawn_blocking_panic(executor, dedicated);
2243        }
2244    }
2245
2246    #[test]
2247    fn test_deterministic_spawn_blocking_panic_caught() {
2248        for dedicated in [false, true] {
2249            let cfg = deterministic::Config::default().with_catch_panics(true);
2250            let executor = deterministic::Runner::new(cfg);
2251            test_spawn_blocking_panic_caught(executor, dedicated);
2252        }
2253    }
2254
2255    #[test]
2256    fn test_deterministic_spawn_blocking_abort() {
2257        for (dedicated, blocking) in [(false, true), (true, false)] {
2258            let executor = deterministic::Runner::default();
2259            test_spawn_abort(executor, dedicated, blocking);
2260        }
2261    }
2262
2263    #[test]
2264    fn test_deterministic_circular_reference_prevents_cleanup() {
2265        let executor = deterministic::Runner::default();
2266        test_circular_reference_prevents_cleanup(executor);
2267    }
2268
2269    #[test]
2270    fn test_deterministic_late_waker() {
2271        let executor = deterministic::Runner::default();
2272        test_late_waker(executor);
2273    }
2274
2275    #[test]
2276    fn test_deterministic_metrics() {
2277        let executor = deterministic::Runner::default();
2278        test_metrics(executor);
2279    }
2280
2281    #[test]
2282    #[should_panic]
2283    fn test_deterministic_metrics_label() {
2284        let executor = deterministic::Runner::default();
2285        test_metrics_label(executor);
2286    }
2287
2288    #[test]
2289    #[should_panic(expected = "label must start with [a-zA-Z]")]
2290    fn test_deterministic_metrics_label_empty() {
2291        let executor = deterministic::Runner::default();
2292        test_metrics_label_empty(executor);
2293    }
2294
2295    #[test]
2296    #[should_panic(expected = "label must start with [a-zA-Z]")]
2297    fn test_deterministic_metrics_label_invalid_first_char() {
2298        let executor = deterministic::Runner::default();
2299        test_metrics_label_invalid_first_char(executor);
2300    }
2301
2302    #[test]
2303    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2304    fn test_deterministic_metrics_label_invalid_char() {
2305        let executor = deterministic::Runner::default();
2306        test_metrics_label_invalid_char(executor);
2307    }
2308
2309    #[test_collect_traces]
2310    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2311        let executor = deterministic::Runner::new(deterministic::Config::default());
2312        executor.start(|context| async move {
2313            context
2314                .with_label("test")
2315                .instrumented()
2316                .spawn(|context| async move {
2317                    tracing::info!(field = "test field", "test log");
2318
2319                    context
2320                        .with_label("inner")
2321                        .instrumented()
2322                        .spawn(|_| async move {
2323                            tracing::info!("inner log");
2324                        })
2325                        .await
2326                        .unwrap();
2327                })
2328                .await
2329                .unwrap();
2330        });
2331
2332        let info_traces = traces.get_by_level(Level::INFO);
2333        assert_eq!(info_traces.len(), 2);
2334
2335        // Outer log (single span)
2336        info_traces
2337            .expect_event_at_index(0, |event| {
2338                event.metadata.expect_content_exact("test log")?;
2339                event.metadata.expect_field_count(1)?;
2340                event.metadata.expect_field_exact("field", "test field")?;
2341                event.expect_span_count(1)?;
2342                event.expect_span_at_index(0, |span| {
2343                    span.expect_content_exact("task")?;
2344                    span.expect_field_count(1)?;
2345                    span.expect_field_exact("name", "test")
2346                })
2347            })
2348            .unwrap();
2349
2350        info_traces
2351            .expect_event_at_index(1, |event| {
2352                event.metadata.expect_content_exact("inner log")?;
2353                event.metadata.expect_field_count(0)?;
2354                event.expect_span_count(1)?;
2355                event.expect_span_at_index(0, |span| {
2356                    span.expect_content_exact("task")?;
2357                    span.expect_field_count(1)?;
2358                    span.expect_field_exact("name", "test_inner")
2359                })
2360            })
2361            .unwrap();
2362    }
2363
2364    #[test]
2365    fn test_deterministic_resolver() {
2366        let executor = deterministic::Runner::default();
2367        executor.start(|context| async move {
2368            // Register DNS mappings
2369            let ip1: IpAddr = "192.168.1.1".parse().unwrap();
2370            let ip2: IpAddr = "192.168.1.2".parse().unwrap();
2371            context.resolver_register("example.com", Some(vec![ip1, ip2]));
2372
2373            // Resolve registered hostname
2374            let addrs = context.resolve("example.com").await.unwrap();
2375            assert_eq!(addrs, vec![ip1, ip2]);
2376
2377            // Resolve unregistered hostname
2378            let result = context.resolve("unknown.com").await;
2379            assert!(matches!(result, Err(Error::ResolveFailed(_))));
2380
2381            // Remove mapping
2382            context.resolver_register("example.com", None);
2383            let result = context.resolve("example.com").await;
2384            assert!(matches!(result, Err(Error::ResolveFailed(_))));
2385        });
2386    }
2387
2388    #[test]
2389    fn test_tokio_error_future() {
2390        let runner = tokio::Runner::default();
2391        test_error_future(runner);
2392    }
2393
2394    #[test]
2395    fn test_tokio_clock_sleep() {
2396        let executor = tokio::Runner::default();
2397        test_clock_sleep(executor);
2398    }
2399
2400    #[test]
2401    fn test_tokio_clock_sleep_until() {
2402        let executor = tokio::Runner::default();
2403        test_clock_sleep_until(executor);
2404    }
2405
2406    #[test]
2407    fn test_tokio_clock_timeout() {
2408        let executor = tokio::Runner::default();
2409        test_clock_timeout(executor);
2410    }
2411
2412    #[test]
2413    fn test_tokio_root_finishes() {
2414        let executor = tokio::Runner::default();
2415        test_root_finishes(executor);
2416    }
2417
2418    #[test]
2419    fn test_tokio_spawn_after_abort() {
2420        let executor = tokio::Runner::default();
2421        test_spawn_after_abort(executor);
2422    }
2423
2424    #[test]
2425    fn test_tokio_spawn_abort() {
2426        let executor = tokio::Runner::default();
2427        test_spawn_abort(executor, false, false);
2428    }
2429
2430    #[test]
2431    #[should_panic(expected = "blah")]
2432    fn test_tokio_panic_aborts_root() {
2433        let executor = tokio::Runner::default();
2434        test_panic_aborts_root(executor);
2435    }
2436
2437    #[test]
2438    #[should_panic(expected = "blah")]
2439    fn test_tokio_panic_aborts_root_caught() {
2440        let cfg = tokio::Config::default().with_catch_panics(true);
2441        let executor = tokio::Runner::new(cfg);
2442        test_panic_aborts_root(executor);
2443    }
2444
2445    #[test]
2446    #[should_panic(expected = "blah")]
2447    fn test_tokio_panic_aborts_spawn() {
2448        let executor = tokio::Runner::default();
2449        test_panic_aborts_spawn(executor);
2450    }
2451
2452    #[test]
2453    fn test_tokio_panic_aborts_spawn_caught() {
2454        let cfg = tokio::Config::default().with_catch_panics(true);
2455        let executor = tokio::Runner::new(cfg);
2456        test_panic_aborts_spawn_caught(executor);
2457    }
2458
2459    #[test]
2460    #[should_panic(expected = "boom")]
2461    fn test_tokio_multiple_panics() {
2462        let executor = tokio::Runner::default();
2463        test_multiple_panics(executor);
2464    }
2465
2466    #[test]
2467    fn test_tokio_multiple_panics_caught() {
2468        let cfg = tokio::Config::default().with_catch_panics(true);
2469        let executor = tokio::Runner::new(cfg);
2470        test_multiple_panics_caught(executor);
2471    }
2472
2473    #[test]
2474    fn test_tokio_select() {
2475        let executor = tokio::Runner::default();
2476        test_select(executor);
2477    }
2478
2479    #[test]
2480    fn test_tokio_select_loop() {
2481        let executor = tokio::Runner::default();
2482        test_select_loop(executor);
2483    }
2484
2485    #[test]
2486    fn test_tokio_storage_operations() {
2487        let executor = tokio::Runner::default();
2488        test_storage_operations(executor);
2489    }
2490
2491    #[test]
2492    fn test_tokio_blob_read_write() {
2493        let executor = tokio::Runner::default();
2494        test_blob_read_write(executor);
2495    }
2496
2497    #[test]
2498    fn test_tokio_blob_resize() {
2499        let executor = tokio::Runner::default();
2500        test_blob_resize(executor);
2501    }
2502
2503    #[test]
2504    fn test_tokio_many_partition_read_write() {
2505        let executor = tokio::Runner::default();
2506        test_many_partition_read_write(executor);
2507    }
2508
2509    #[test]
2510    fn test_tokio_blob_read_past_length() {
2511        let executor = tokio::Runner::default();
2512        test_blob_read_past_length(executor);
2513    }
2514
2515    #[test]
2516    fn test_tokio_blob_clone_and_concurrent_read() {
2517        // Run test
2518        let executor = tokio::Runner::default();
2519        test_blob_clone_and_concurrent_read(executor);
2520    }
2521
2522    #[test]
2523    fn test_tokio_shutdown() {
2524        let executor = tokio::Runner::default();
2525        test_shutdown(executor);
2526    }
2527
2528    #[test]
2529    fn test_tokio_shutdown_multiple_signals() {
2530        let executor = tokio::Runner::default();
2531        test_shutdown_multiple_signals(executor);
2532    }
2533
2534    #[test]
2535    fn test_tokio_shutdown_timeout() {
2536        let executor = tokio::Runner::default();
2537        test_shutdown_timeout(executor);
2538    }
2539
2540    #[test]
2541    fn test_tokio_shutdown_multiple_stop_calls() {
2542        let executor = tokio::Runner::default();
2543        test_shutdown_multiple_stop_calls(executor);
2544    }
2545
2546    #[test]
2547    fn test_tokio_unfulfilled_shutdown() {
2548        let executor = tokio::Runner::default();
2549        test_unfulfilled_shutdown(executor);
2550    }
2551
2552    #[test]
2553    fn test_tokio_spawn_dedicated() {
2554        let executor = tokio::Runner::default();
2555        test_spawn_dedicated(executor);
2556    }
2557
2558    #[test]
2559    fn test_tokio_spawn() {
2560        let runner = tokio::Runner::default();
2561        test_spawn(runner);
2562    }
2563
2564    #[test]
2565    fn test_tokio_spawn_abort_on_parent_abort() {
2566        let runner = tokio::Runner::default();
2567        test_spawn_abort_on_parent_abort(runner);
2568    }
2569
2570    #[test]
2571    fn test_tokio_spawn_abort_on_parent_completion() {
2572        let runner = tokio::Runner::default();
2573        test_spawn_abort_on_parent_completion(runner);
2574    }
2575
2576    #[test]
2577    fn test_tokio_spawn_cascading_abort() {
2578        let runner = tokio::Runner::default();
2579        test_spawn_cascading_abort(runner);
2580    }
2581
2582    #[test]
2583    fn test_tokio_child_survives_sibling_completion() {
2584        let runner = tokio::Runner::default();
2585        test_child_survives_sibling_completion(runner);
2586    }
2587
2588    #[test]
2589    fn test_tokio_spawn_clone_chain() {
2590        let runner = tokio::Runner::default();
2591        test_spawn_clone_chain(runner);
2592    }
2593
2594    #[test]
2595    fn test_tokio_spawn_sparse_clone_chain() {
2596        let runner = tokio::Runner::default();
2597        test_spawn_sparse_clone_chain(runner);
2598    }
2599
2600    #[test]
2601    fn test_tokio_spawn_blocking() {
2602        for dedicated in [false, true] {
2603            let executor = tokio::Runner::default();
2604            test_spawn_blocking(executor, dedicated);
2605        }
2606    }
2607
2608    #[test]
2609    #[should_panic(expected = "blocking task panicked")]
2610    fn test_tokio_spawn_blocking_panic() {
2611        for dedicated in [false, true] {
2612            let executor = tokio::Runner::default();
2613            test_spawn_blocking_panic(executor, dedicated);
2614        }
2615    }
2616
2617    #[test]
2618    fn test_tokio_spawn_blocking_panic_caught() {
2619        for dedicated in [false, true] {
2620            let cfg = tokio::Config::default().with_catch_panics(true);
2621            let executor = tokio::Runner::new(cfg);
2622            test_spawn_blocking_panic_caught(executor, dedicated);
2623        }
2624    }
2625
2626    #[test]
2627    fn test_tokio_spawn_blocking_abort() {
2628        for (dedicated, blocking) in [(false, true), (true, false)] {
2629            let executor = tokio::Runner::default();
2630            test_spawn_abort(executor, dedicated, blocking);
2631        }
2632    }
2633
2634    #[test]
2635    fn test_tokio_circular_reference_prevents_cleanup() {
2636        let executor = tokio::Runner::default();
2637        test_circular_reference_prevents_cleanup(executor);
2638    }
2639
2640    #[test]
2641    fn test_tokio_late_waker() {
2642        let executor = tokio::Runner::default();
2643        test_late_waker(executor);
2644    }
2645
2646    #[test]
2647    fn test_tokio_metrics() {
2648        let executor = tokio::Runner::default();
2649        test_metrics(executor);
2650    }
2651
2652    #[test]
2653    #[should_panic]
2654    fn test_tokio_metrics_label() {
2655        let executor = tokio::Runner::default();
2656        test_metrics_label(executor);
2657    }
2658
2659    #[test]
2660    #[should_panic(expected = "label must start with [a-zA-Z]")]
2661    fn test_tokio_metrics_label_empty() {
2662        let executor = tokio::Runner::default();
2663        test_metrics_label_empty(executor);
2664    }
2665
2666    #[test]
2667    #[should_panic(expected = "label must start with [a-zA-Z]")]
2668    fn test_tokio_metrics_label_invalid_first_char() {
2669        let executor = tokio::Runner::default();
2670        test_metrics_label_invalid_first_char(executor);
2671    }
2672
2673    #[test]
2674    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2675    fn test_tokio_metrics_label_invalid_char() {
2676        let executor = tokio::Runner::default();
2677        test_metrics_label_invalid_char(executor);
2678    }
2679
2680    #[test]
2681    fn test_tokio_process_rss_metric() {
2682        let executor = tokio::Runner::default();
2683        executor.start(|context| async move {
2684            loop {
2685                // Wait for RSS metric to be available
2686                let metrics = context.encode();
2687                if !metrics.contains("runtime_process_rss") {
2688                    context.sleep(Duration::from_millis(100)).await;
2689                    continue;
2690                }
2691
2692                // Verify the RSS value is eventually populated (greater than 0)
2693                for line in metrics.lines() {
2694                    if line.starts_with("runtime_process_rss")
2695                        && !line.starts_with("runtime_process_rss{")
2696                    {
2697                        let parts: Vec<&str> = line.split_whitespace().collect();
2698                        if parts.len() >= 2 {
2699                            let rss_value: i64 =
2700                                parts[1].parse().expect("Failed to parse RSS value");
2701                            if rss_value > 0 {
2702                                return;
2703                            }
2704                        }
2705                    }
2706                }
2707            }
2708        });
2709    }
2710
2711    #[test]
2712    fn test_tokio_telemetry() {
2713        let executor = tokio::Runner::default();
2714        executor.start(|context| async move {
2715            // Define the server address
2716            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2717
2718            // Configure telemetry
2719            tokio::telemetry::init(
2720                context.with_label("metrics"),
2721                tokio::telemetry::Logging {
2722                    level: Level::INFO,
2723                    json: false,
2724                },
2725                Some(address),
2726                None,
2727            );
2728
2729            // Register a test metric
2730            let counter: Counter<u64> = Counter::default();
2731            context.register("test_counter", "Test counter", counter.clone());
2732            counter.inc();
2733
2734            // Helper functions to parse HTTP response
2735            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2736                let mut line = Vec::new();
2737                loop {
2738                    let byte = stream.recv(vec![0; 1]).await?;
2739                    if byte[0] == b'\n' {
2740                        if line.last() == Some(&b'\r') {
2741                            line.pop(); // Remove trailing \r
2742                        }
2743                        break;
2744                    }
2745                    line.push(byte[0]);
2746                }
2747                String::from_utf8(line).map_err(|_| Error::ReadFailed)
2748            }
2749
2750            async fn read_headers<St: Stream>(
2751                stream: &mut St,
2752            ) -> Result<HashMap<String, String>, Error> {
2753                let mut headers = HashMap::new();
2754                loop {
2755                    let line = read_line(stream).await?;
2756                    if line.is_empty() {
2757                        break;
2758                    }
2759                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
2760                    if parts.len() == 2 {
2761                        headers.insert(parts[0].to_string(), parts[1].to_string());
2762                    }
2763                }
2764                Ok(headers)
2765            }
2766
2767            async fn read_body<St: Stream>(
2768                stream: &mut St,
2769                content_length: usize,
2770            ) -> Result<String, Error> {
2771                let read = stream.recv(vec![0; content_length]).await?;
2772                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2773            }
2774
2775            // Simulate a client connecting to the server
2776            let client_handle = context
2777                .with_label("client")
2778                .spawn(move |context| async move {
2779                    let (mut sink, mut stream) = loop {
2780                        match context.dial(address).await {
2781                            Ok((sink, stream)) => break (sink, stream),
2782                            Err(e) => {
2783                                // The client may be polled before the server is ready, that's alright!
2784                                error!(err =?e, "failed to connect");
2785                                context.sleep(Duration::from_millis(10)).await;
2786                            }
2787                        }
2788                    };
2789
2790                    // Send a GET request to the server
2791                    let request = format!(
2792                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2793                    );
2794                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
2795
2796                    // Read and verify the HTTP status line
2797                    let status_line = read_line(&mut stream).await.unwrap();
2798                    assert_eq!(status_line, "HTTP/1.1 200 OK");
2799
2800                    // Read and parse headers
2801                    let headers = read_headers(&mut stream).await.unwrap();
2802                    println!("Headers: {headers:?}");
2803                    let content_length = headers
2804                        .get("content-length")
2805                        .unwrap()
2806                        .parse::<usize>()
2807                        .unwrap();
2808
2809                    // Read and verify the body
2810                    let body = read_body(&mut stream, content_length).await.unwrap();
2811                    assert!(body.contains("test_counter_total 1"));
2812                });
2813
2814            // Wait for the client task to complete
2815            client_handle.await.unwrap();
2816        });
2817    }
2818
2819    #[test]
2820    fn test_tokio_resolver() {
2821        let executor = tokio::Runner::default();
2822        executor.start(|context| async move {
2823            let addrs = context.resolve("localhost").await.unwrap();
2824            assert!(!addrs.is_empty());
2825            for addr in addrs {
2826                assert!(
2827                    addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
2828                        || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
2829                );
2830            }
2831        });
2832    }
2833}