commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20#![doc(
21    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22    html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_macros::select;
26use commonware_utils::StableBuf;
27use prometheus_client::registry::Metric;
28use std::{
29    future::Future,
30    io::Error as IoError,
31    net::SocketAddr,
32    time::{Duration, SystemTime},
33};
34use thiserror::Error;
35
36#[macro_use]
37mod macros;
38
39pub mod deterministic;
40pub mod mocks;
41cfg_if::cfg_if! {
42    if #[cfg(not(target_arch = "wasm32"))] {
43        pub mod tokio;
44        pub mod benchmarks;
45    }
46}
47mod network;
48mod process;
49mod storage;
50pub mod telemetry;
51mod utils;
52pub use utils::*;
53#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
54mod iouring;
55
56/// Prefix for runtime metrics.
57const METRICS_PREFIX: &str = "runtime";
58
59/// Errors that can occur when interacting with the runtime.
60#[derive(Error, Debug)]
61pub enum Error {
62    #[error("exited")]
63    Exited,
64    #[error("closed")]
65    Closed,
66    #[error("timeout")]
67    Timeout,
68    #[error("bind failed")]
69    BindFailed,
70    #[error("connection failed")]
71    ConnectionFailed,
72    #[error("write failed")]
73    WriteFailed,
74    #[error("read failed")]
75    ReadFailed,
76    #[error("send failed")]
77    SendFailed,
78    #[error("recv failed")]
79    RecvFailed,
80    #[error("partition creation failed: {0}")]
81    PartitionCreationFailed(String),
82    #[error("partition missing: {0}")]
83    PartitionMissing(String),
84    #[error("partition corrupt: {0}")]
85    PartitionCorrupt(String),
86    #[error("blob open failed: {0}/{1} error: {2}")]
87    BlobOpenFailed(String, String, IoError),
88    #[error("blob missing: {0}/{1}")]
89    BlobMissing(String, String),
90    #[error("blob resize failed: {0}/{1} error: {2}")]
91    BlobResizeFailed(String, String, IoError),
92    #[error("blob sync failed: {0}/{1} error: {2}")]
93    BlobSyncFailed(String, String, IoError),
94    #[error("blob insufficient length")]
95    BlobInsufficientLength,
96    #[error("offset overflow")]
97    OffsetOverflow,
98    #[error("io error: {0}")]
99    Io(#[from] IoError),
100}
101
102/// Interface that any task scheduler must implement to start
103/// running tasks.
104pub trait Runner {
105    /// Context defines the environment available to tasks.
106    /// Example of possible services provided by the context include:
107    /// - [Clock] for time-based operations
108    /// - [Network] for network operations
109    /// - [Storage] for storage operations
110    type Context;
111
112    /// Start running a root task.
113    ///
114    /// When this function returns, all spawned tasks will be canceled. If clean
115    /// shutdown cannot be implemented via `Drop`, consider using [Spawner::stop] and
116    /// [Spawner::stopped] to coordinate clean shutdown.
117    fn start<F, Fut>(self, f: F) -> Fut::Output
118    where
119        F: FnOnce(Self::Context) -> Fut,
120        Fut: Future;
121}
122
123/// Interface that any task scheduler must implement to spawn tasks.
124pub trait Spawner: Clone + Send + Sync + 'static {
125    /// Return a [`Spawner`] that schedules tasks onto the runtime's shared executor.
126    ///
127    /// Set `blocking` to `true` when the task may hold the thread for a short, blocking operation.
128    /// Runtimes can use this hint to move the work to a blocking-friendly pool so asynchronous
129    /// tasks on a work-stealing executor are not starved. For long-lived, blocking work, use
130    /// [`Spawner::dedicated`] instead.
131    ///
132    /// The shared executor with `blocking == false` is the default spawn mode.
133    fn shared(self, blocking: bool) -> Self;
134
135    /// Return a [`Spawner`] that runs tasks on a dedicated thread when the runtime supports it.
136    ///
137    /// Reserve this for long-lived or prioritized tasks that should not compete for resources in the
138    /// shared executor.
139    ///
140    /// This is not the default behavior. See [`Spawner::shared`] for more information.
141    fn dedicated(self) -> Self;
142
143    /// Return a [`Spawner`] that instruments the next spawned task with the label of the spawning context.
144    fn instrumented(self) -> Self;
145
146    /// Spawn a task with the current context.
147    ///
148    /// Unlike directly awaiting a future, the task starts running immediately even if the caller
149    /// never awaits the returned [`Handle`].
150    ///
151    /// # Mandatory Supervision
152    ///
153    /// All tasks are supervised. When a parent task finishes or is aborted, all its descendants are aborted.
154    ///
155    /// Spawn consumes the current task and provides a new child context to the spawned task. Likewise, cloning
156    /// a context (either via [`Clone::clone`] or [`Metrics::with_label`]) returns a child context.
157    ///
158    /// ```txt
159    /// ctx_a
160    ///   |
161    ///   +-- clone() ---> ctx_c
162    ///   |                  |
163    ///   |                  +-- spawn() ---> Task C (ctx_d)
164    ///   |
165    ///   +-- spawn() ---> Task A (ctx_b)
166    ///                              |
167    ///                              +-- spawn() ---> Task B (ctx_e)
168    ///
169    /// Task A finishes or aborts --> Task B and Task C are aborted
170    /// ```
171    ///
172    /// # Spawn Configuration
173    ///
174    /// When a context is cloned (either via [`Clone::clone`] or [`Metrics::with_label`]) or provided via
175    /// [`Spawner::spawn`], any configuration made via [`Spawner::dedicated`] or [`Spawner::shared`] is reset.
176    ///
177    /// Child tasks should assume they start from a clean configuration without needing to inspect how their
178    /// parent was configured.
179    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
180    where
181        F: FnOnce(Self) -> Fut + Send + 'static,
182        Fut: Future<Output = T> + Send + 'static,
183        T: Send + 'static;
184
185    /// Signals the runtime to stop execution and waits for all outstanding tasks
186    /// to perform any required cleanup and exit.
187    ///
188    /// This method does not actually kill any tasks but rather signals to them, using
189    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
190    /// It then waits for all [signal::Signal] references to be dropped before returning.
191    ///
192    /// ## Multiple Stop Calls
193    ///
194    /// This method is idempotent and safe to call multiple times concurrently (on
195    /// different instances of the same context since it consumes `self`). The first
196    /// call initiates shutdown with the provided `value`, and all subsequent calls
197    /// will wait for the same completion regardless of their `value` parameter, i.e.
198    /// the original `value` from the first call is preserved.
199    ///
200    /// ## Timeout
201    ///
202    /// If a timeout is provided, the method will return an error if all [signal::Signal]
203    /// references have not been dropped within the specified duration.
204    fn stop(
205        self,
206        value: i32,
207        timeout: Option<Duration>,
208    ) -> impl Future<Output = Result<(), Error>> + Send;
209
210    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
211    /// any task.
212    ///
213    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
214    /// immediately. The [signal::Signal] returned will always resolve to the value of the
215    /// first [Spawner::stop] call.
216    fn stopped(&self) -> signal::Signal;
217}
218
219/// Interface to register and encode metrics.
220pub trait Metrics: Clone + Send + Sync + 'static {
221    /// Get the current label of the context.
222    fn label(&self) -> String;
223
224    /// Create a new instance of `Metrics` with the given label appended to the end
225    /// of the current `Metrics` label.
226    ///
227    /// This is commonly used to create a nested context for `register`.
228    ///
229    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
230    /// label (reserved for metrics for the runtime).
231    fn with_label(&self, label: &str) -> Self;
232
233    /// Prefix the given label with the current context's label.
234    ///
235    /// Unlike `with_label`, this method does not create a new context.
236    fn scoped_label(&self, label: &str) -> String {
237        let label = if self.label().is_empty() {
238            label.to_string()
239        } else {
240            format!("{}_{}", self.label(), label)
241        };
242        assert!(
243            !label.starts_with(METRICS_PREFIX),
244            "using runtime label is not allowed"
245        );
246        label
247    }
248
249    /// Register a metric with the runtime.
250    ///
251    /// Any registered metric will include (as a prefix) the label of the current context.
252    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
253
254    /// Encode all metrics into a buffer.
255    fn encode(&self) -> String;
256}
257
258/// Interface that any task scheduler must implement to provide
259/// time-based operations.
260///
261/// It is necessary to mock time to provide deterministic execution
262/// of arbitrary tasks.
263pub trait Clock: Clone + Send + Sync + 'static {
264    /// Returns the current time.
265    fn current(&self) -> SystemTime;
266
267    /// Sleep for the given duration.
268    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
269
270    /// Sleep until the given deadline.
271    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
272
273    /// Await a future with a timeout, returning `Error::Timeout` if it expires.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use std::time::Duration;
279    /// use commonware_runtime::{deterministic, Error, Runner, Clock};
280    ///
281    /// let executor = deterministic::Runner::default();
282    /// executor.start(|context| async move {
283    ///     match context
284    ///         .timeout(Duration::from_millis(100), async { 42 })
285    ///         .await
286    ///     {
287    ///         Ok(value) => assert_eq!(value, 42),
288    ///         Err(Error::Timeout) => panic!("should not timeout"),
289    ///         Err(e) => panic!("unexpected error: {:?}", e),
290    ///     }
291    /// });
292    /// ```
293    fn timeout<F, T>(
294        &self,
295        duration: Duration,
296        future: F,
297    ) -> impl Future<Output = Result<T, Error>> + Send + '_
298    where
299        F: Future<Output = T> + Send + 'static,
300        T: Send + 'static,
301    {
302        async move {
303            select! {
304                result = future => {
305                    Ok(result)
306                },
307                _ = self.sleep(duration) => {
308                    Err(Error::Timeout)
309                },
310            }
311        }
312    }
313}
314
315cfg_if::cfg_if! {
316    if #[cfg(feature = "external")] {
317        /// Interface that runtimes can implement to constrain the execution latency of a future.
318        pub trait Pacer: Clock + Clone + Send + Sync + 'static {
319            /// Defer completion of a future until a specified `latency` has elapsed. If the future is
320            /// not yet ready at the desired time of completion, the runtime will block until the future
321            /// is ready.
322            ///
323            /// In [crate::deterministic], this is used to ensure interactions with external systems can
324            /// be interacted with deterministically. In [crate::tokio], this is a no-op (allows
325            /// multiple runtimes to be tested with no code changes).
326            ///
327            /// # Setting Latency
328            ///
329            /// `pace` is not meant to be a time penalty applied to awaited futures and should be set to
330            /// the expected resolution latency of the future. To better explore the possible behavior of an
331            /// application, users can set latency to a randomly chosen value in the range of
332            /// `[expected latency / 2, expected latency * 2]`.
333            ///
334            /// # Warning
335            ///
336            /// Because `pace` blocks if the future is not ready, it is important that the future's completion
337            /// doesn't require anything in the current thread to complete (or else it will deadlock).
338            fn pace<'a, F, T>(
339                &'a self,
340                latency: Duration,
341                future: F,
342            ) -> impl Future<Output = T> + Send + 'a
343            where
344                F: Future<Output = T> + Send + 'a,
345                T: Send + 'a;
346        }
347
348        /// Extension trait that makes it more ergonomic to use [Pacer].
349        ///
350        /// This inverts the call-site of [`Pacer::pace`] by letting the future itself request how the
351        /// runtime should delay completion relative to the clock.
352        pub trait FutureExt: Future + Send + Sized {
353            /// Delay completion of the future until a specified `latency` on `pacer`.
354            fn pace<'a, E>(
355                self,
356                pacer: &'a E,
357                latency: Duration,
358            ) -> impl Future<Output = Self::Output> + Send + 'a
359            where
360                E: Pacer + 'a,
361                Self: Send + 'a,
362                Self::Output: Send + 'a,
363            {
364                pacer.pace(latency, self)
365            }
366        }
367
368        impl<F> FutureExt for F where F: Future + Send {}
369    }
370}
371
372/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
373pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
374
375/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
376pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
377
378/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
379pub type ListenerOf<N> = <N as crate::Network>::Listener;
380
381/// Interface that any runtime must implement to create
382/// network connections.
383pub trait Network: Clone + Send + Sync + 'static {
384    /// The type of [Listener] that's returned when binding to a socket.
385    /// Accepting a connection returns a [Sink] and [Stream] which are defined
386    /// by the [Listener] and used to send and receive data over the connection.
387    type Listener: Listener;
388
389    /// Bind to the given socket address.
390    fn bind(
391        &self,
392        socket: SocketAddr,
393    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
394
395    /// Dial the given socket address.
396    fn dial(
397        &self,
398        socket: SocketAddr,
399    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
400}
401
402/// Interface that any runtime must implement to handle
403/// incoming network connections.
404pub trait Listener: Sync + Send + 'static {
405    /// The type of [Sink] that's returned when accepting a connection.
406    /// This is used to send data to the remote connection.
407    type Sink: Sink;
408    /// The type of [Stream] that's returned when accepting a connection.
409    /// This is used to receive data from the remote connection.
410    type Stream: Stream;
411
412    /// Accept an incoming connection.
413    fn accept(
414        &mut self,
415    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
416
417    /// Returns the local address of the listener.
418    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
419}
420
421/// Interface that any runtime must implement to send
422/// messages over a network connection.
423pub trait Sink: Sync + Send + 'static {
424    /// Send a message to the sink.
425    fn send(
426        &mut self,
427        msg: impl Into<StableBuf> + Send,
428    ) -> impl Future<Output = Result<(), Error>> + Send;
429}
430
431/// Interface that any runtime must implement to receive
432/// messages over a network connection.
433pub trait Stream: Sync + Send + 'static {
434    /// Receive a message from the stream, storing it in the given buffer.
435    /// Reads exactly the number of bytes that fit in the buffer.
436    fn recv(
437        &mut self,
438        buf: impl Into<StableBuf> + Send,
439    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
440}
441
442/// Interface to interact with storage.
443///
444///
445/// To support storage implementations that enable concurrent reads and
446/// writes, blobs are responsible for maintaining synchronization.
447///
448/// Storage can be backed by a local filesystem, cloud storage, etc.
449pub trait Storage: Clone + Send + Sync + 'static {
450    /// The readable/writeable storage buffer that can be opened by this Storage.
451    type Blob: Blob;
452
453    /// Open an existing blob in a given partition or create a new one, returning
454    /// the blob and its length.
455    ///
456    /// Multiple instances of the same blob can be opened concurrently, however,
457    /// writing to the same blob concurrently may lead to undefined behavior.
458    ///
459    /// An Ok result indicates the blob is durably created (or already exists).
460    fn open(
461        &self,
462        partition: &str,
463        name: &[u8],
464    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
465
466    /// Remove a blob from a given partition.
467    ///
468    /// If no `name` is provided, the entire partition is removed.
469    ///
470    /// An Ok result indicates the blob is durably removed.
471    fn remove(
472        &self,
473        partition: &str,
474        name: Option<&[u8]>,
475    ) -> impl Future<Output = Result<(), Error>> + Send;
476
477    /// Return all blobs in a given partition.
478    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
479}
480
481/// Interface to read and write to a blob.
482///
483/// To support blob implementations that enable concurrent reads and
484/// writes, blobs are responsible for maintaining synchronization.
485///
486/// Cloning a blob is similar to wrapping a single file descriptor in
487/// a lock whereas opening a new blob (of the same name) is similar to
488/// opening a new file descriptor. If multiple blobs are opened with the same
489/// name, they are not expected to coordinate access to underlying storage
490/// and writing to both is undefined behavior.
491///
492/// When a blob is dropped, any unsynced changes may be discarded. Implementations
493/// may attempt to sync during drop but errors will go unhandled. Call `sync`
494/// before dropping to ensure all changes are durably persisted.
495#[allow(clippy::len_without_is_empty)]
496pub trait Blob: Clone + Send + Sync + 'static {
497    /// Read from the blob at the given offset.
498    ///
499    /// `read_at` does not return the number of bytes read because it
500    /// only returns once the entire buffer has been filled.
501    fn read_at(
502        &self,
503        buf: impl Into<StableBuf> + Send,
504        offset: u64,
505    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
506
507    /// Write `buf` to the blob at the given offset.
508    fn write_at(
509        &self,
510        buf: impl Into<StableBuf> + Send,
511        offset: u64,
512    ) -> impl Future<Output = Result<(), Error>> + Send;
513
514    /// Resize the blob to the given length.
515    ///
516    /// If the length is greater than the current length, the blob is extended with zeros.
517    /// If the length is less than the current length, the blob is resized.
518    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
519
520    /// Ensure all pending data is durably persisted.
521    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use crate::telemetry::traces::collector::TraceStorage;
528    use bytes::Bytes;
529    use commonware_macros::{select, test_collect_traces};
530    use futures::{
531        channel::{mpsc, oneshot},
532        future::{pending, ready},
533        join, pin_mut, FutureExt, SinkExt, StreamExt,
534    };
535    use prometheus_client::metrics::counter::Counter;
536    use std::{
537        collections::HashMap,
538        pin::Pin,
539        str::FromStr,
540        sync::{
541            atomic::{AtomicU32, Ordering},
542            Arc, Mutex,
543        },
544        task::{Context as TContext, Poll, Waker},
545    };
546    use tracing::{error, Level};
547    use utils::reschedule;
548
549    fn test_error_future<R: Runner>(runner: R) {
550        async fn error_future() -> Result<&'static str, &'static str> {
551            Err("An error occurred")
552        }
553        let result = runner.start(|_| error_future());
554        assert_eq!(result, Err("An error occurred"));
555    }
556
557    fn test_clock_sleep<R: Runner>(runner: R)
558    where
559        R::Context: Spawner + Clock,
560    {
561        runner.start(|context| async move {
562            // Capture initial time
563            let start = context.current();
564            let sleep_duration = Duration::from_millis(10);
565            context.sleep(sleep_duration).await;
566
567            // After run, time should have advanced
568            let end = context.current();
569            assert!(end.duration_since(start).unwrap() >= sleep_duration);
570        });
571    }
572
573    fn test_clock_sleep_until<R: Runner>(runner: R)
574    where
575        R::Context: Spawner + Clock + Metrics,
576    {
577        runner.start(|context| async move {
578            // Trigger sleep
579            let now = context.current();
580            context.sleep_until(now + Duration::from_millis(100)).await;
581
582            // Ensure slept duration has elapsed
583            let elapsed = now.elapsed().unwrap();
584            assert!(elapsed >= Duration::from_millis(100));
585        });
586    }
587
588    fn test_clock_timeout<R: Runner>(runner: R)
589    where
590        R::Context: Spawner + Clock,
591    {
592        runner.start(|context| async move {
593            // Future completes before timeout
594            let result = context
595                .timeout(Duration::from_millis(100), async { "success" })
596                .await;
597            assert_eq!(result.unwrap(), "success");
598
599            // Future exceeds timeout duration
600            let result = context
601                .timeout(Duration::from_millis(50), pending::<()>())
602                .await;
603            assert!(matches!(result, Err(Error::Timeout)));
604
605            // Future completes within timeout
606            let result = context
607                .timeout(
608                    Duration::from_millis(100),
609                    context.sleep(Duration::from_millis(50)),
610                )
611                .await;
612            assert!(result.is_ok());
613        });
614    }
615
616    fn test_root_finishes<R: Runner>(runner: R)
617    where
618        R::Context: Spawner,
619    {
620        runner.start(|context| async move {
621            context.spawn(|_| async move {
622                loop {
623                    reschedule().await;
624                }
625            });
626        });
627    }
628
629    fn test_spawn_after_abort<R>(runner: R)
630    where
631        R: Runner,
632        R::Context: Spawner + Clone,
633    {
634        runner.start(|context| async move {
635            // Create a child context
636            let child = context.clone();
637
638            // Spawn parent and abort
639            let parent_handle = context.spawn(move |_| async move {
640                pending::<()>().await;
641            });
642            parent_handle.abort();
643
644            // Spawn child and ensure it aborts
645            let child_handle = child.spawn(move |_| async move {
646                pending::<()>().await;
647            });
648            assert!(matches!(child_handle.await, Err(Error::Closed)));
649        });
650    }
651
652    fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
653    where
654        R::Context: Spawner,
655    {
656        runner.start(|context| async move {
657            let context = if dedicated {
658                assert!(!blocking);
659                context.dedicated()
660            } else {
661                context.shared(blocking)
662            };
663
664            let handle = context.spawn(|_| async move {
665                loop {
666                    reschedule().await;
667                }
668            });
669            handle.abort();
670            assert!(matches!(handle.await, Err(Error::Closed)));
671        });
672    }
673
674    fn test_panic_aborts_root<R: Runner>(runner: R) {
675        let result: Result<(), Error> = runner.start(|_| async move {
676            panic!("blah");
677        });
678        result.unwrap_err();
679    }
680
681    fn test_panic_aborts_spawn<R: Runner>(runner: R)
682    where
683        R::Context: Spawner + Clock,
684    {
685        runner.start(|context| async move {
686            context.clone().spawn(|_| async move {
687                panic!("blah");
688            });
689
690            // Loop until panic
691            loop {
692                context.sleep(Duration::from_millis(100)).await;
693            }
694        });
695    }
696
697    fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
698    where
699        R::Context: Spawner + Clock,
700    {
701        let result: Result<(), Error> = runner.start(|context| async move {
702            let result = context.clone().spawn(|_| async move {
703                panic!("blah");
704            });
705            result.await
706        });
707        assert!(matches!(result, Err(Error::Exited)));
708    }
709
710    fn test_multiple_panics<R: Runner>(runner: R)
711    where
712        R::Context: Spawner + Clock,
713    {
714        runner.start(|context| async move {
715            context.clone().spawn(|_| async move {
716                panic!("boom 1");
717            });
718            context.clone().spawn(|_| async move {
719                panic!("boom 2");
720            });
721            context.clone().spawn(|_| async move {
722                panic!("boom 3");
723            });
724
725            // Loop until panic
726            loop {
727                context.sleep(Duration::from_millis(100)).await;
728            }
729        });
730    }
731
732    fn test_multiple_panics_caught<R: Runner>(runner: R)
733    where
734        R::Context: Spawner + Clock,
735    {
736        let (res1, res2, res3) = runner.start(|context| async move {
737            let handle1 = context.clone().spawn(|_| async move {
738                panic!("boom 1");
739            });
740            let handle2 = context.clone().spawn(|_| async move {
741                panic!("boom 2");
742            });
743            let handle3 = context.clone().spawn(|_| async move {
744                panic!("boom 3");
745            });
746
747            join!(handle1, handle2, handle3)
748        });
749        assert!(matches!(res1, Err(Error::Exited)));
750        assert!(matches!(res2, Err(Error::Exited)));
751        assert!(matches!(res3, Err(Error::Exited)));
752    }
753
754    fn test_select<R: Runner>(runner: R) {
755        runner.start(|_| async move {
756            // Test first branch
757            let output = Mutex::new(0);
758            select! {
759                v1 = ready(1) => {
760                    *output.lock().unwrap() = v1;
761                },
762                v2 = ready(2) => {
763                    *output.lock().unwrap() = v2;
764                },
765            };
766            assert_eq!(*output.lock().unwrap(), 1);
767
768            // Test second branch
769            select! {
770                v1 = std::future::pending::<i32>() => {
771                    *output.lock().unwrap() = v1;
772                },
773                v2 = ready(2) => {
774                    *output.lock().unwrap() = v2;
775                },
776            };
777            assert_eq!(*output.lock().unwrap(), 2);
778        });
779    }
780
781    /// Ensure future fusing works as expected.
782    fn test_select_loop<R: Runner>(runner: R)
783    where
784        R::Context: Clock,
785    {
786        runner.start(|context| async move {
787            // Should hit timeout
788            let (mut sender, mut receiver) = mpsc::unbounded();
789            for _ in 0..2 {
790                select! {
791                    v = receiver.next() => {
792                        panic!("unexpected value: {v:?}");
793                    },
794                    _ = context.sleep(Duration::from_millis(100)) => {
795                        continue;
796                    },
797                };
798            }
799
800            // Populate channel
801            sender.send(0).await.unwrap();
802            sender.send(1).await.unwrap();
803
804            // Prefer not reading channel without losing messages
805            select! {
806                _ = async {} => {
807                    // Skip reading from channel even though populated
808                },
809                v = receiver.next() => {
810                    panic!("unexpected value: {v:?}");
811                },
812            };
813
814            // Process messages
815            for i in 0..2 {
816                select! {
817                    _ = context.sleep(Duration::from_millis(100)) => {
818                        panic!("timeout");
819                    },
820                    v = receiver.next() => {
821                        assert_eq!(v.unwrap(), i);
822                    },
823                };
824            }
825        });
826    }
827
828    fn test_storage_operations<R: Runner>(runner: R)
829    where
830        R::Context: Storage,
831    {
832        runner.start(|context| async move {
833            let partition = "test_partition";
834            let name = b"test_blob";
835
836            // Open a new blob
837            let (blob, _) = context
838                .open(partition, name)
839                .await
840                .expect("Failed to open blob");
841
842            // Write data to the blob
843            let data = b"Hello, Storage!";
844            blob.write_at(Vec::from(data), 0)
845                .await
846                .expect("Failed to write to blob");
847
848            // Sync the blob
849            blob.sync().await.expect("Failed to sync blob");
850
851            // Read data from the blob
852            let read = blob
853                .read_at(vec![0; data.len()], 0)
854                .await
855                .expect("Failed to read from blob");
856            assert_eq!(read.as_ref(), data);
857
858            // Sync the blob
859            blob.sync().await.expect("Failed to sync blob");
860
861            // Scan blobs in the partition
862            let blobs = context
863                .scan(partition)
864                .await
865                .expect("Failed to scan partition");
866            assert!(blobs.contains(&name.to_vec()));
867
868            // Reopen the blob
869            let (blob, len) = context
870                .open(partition, name)
871                .await
872                .expect("Failed to reopen blob");
873            assert_eq!(len, data.len() as u64);
874
875            // Read data part of message back
876            let read = blob
877                .read_at(vec![0u8; 7], 7)
878                .await
879                .expect("Failed to read data");
880            assert_eq!(read.as_ref(), b"Storage");
881
882            // Sync the blob
883            blob.sync().await.expect("Failed to sync blob");
884
885            // Remove the blob
886            context
887                .remove(partition, Some(name))
888                .await
889                .expect("Failed to remove blob");
890
891            // Ensure the blob is removed
892            let blobs = context
893                .scan(partition)
894                .await
895                .expect("Failed to scan partition");
896            assert!(!blobs.contains(&name.to_vec()));
897
898            // Remove the partition
899            context
900                .remove(partition, None)
901                .await
902                .expect("Failed to remove partition");
903
904            // Scan the partition
905            let result = context.scan(partition).await;
906            assert!(matches!(result, Err(Error::PartitionMissing(_))));
907        });
908    }
909
910    fn test_blob_read_write<R: Runner>(runner: R)
911    where
912        R::Context: Storage,
913    {
914        runner.start(|context| async move {
915            let partition = "test_partition";
916            let name = b"test_blob_rw";
917
918            // Open a new blob
919            let (blob, _) = context
920                .open(partition, name)
921                .await
922                .expect("Failed to open blob");
923
924            // Write data at different offsets
925            let data1 = b"Hello";
926            let data2 = b"World";
927            blob.write_at(Vec::from(data1), 0)
928                .await
929                .expect("Failed to write data1");
930            blob.write_at(Vec::from(data2), 5)
931                .await
932                .expect("Failed to write data2");
933
934            // Read data back
935            let read = blob
936                .read_at(vec![0u8; 10], 0)
937                .await
938                .expect("Failed to read data");
939            assert_eq!(&read.as_ref()[..5], data1);
940            assert_eq!(&read.as_ref()[5..], data2);
941
942            // Read past end of blob
943            let result = blob.read_at(vec![0u8; 10], 10).await;
944            assert!(result.is_err());
945
946            // Rewrite data without affecting length
947            let data3 = b"Store";
948            blob.write_at(Vec::from(data3), 5)
949                .await
950                .expect("Failed to write data3");
951
952            // Read data back
953            let read = blob
954                .read_at(vec![0u8; 10], 0)
955                .await
956                .expect("Failed to read data");
957            assert_eq!(&read.as_ref()[..5], data1);
958            assert_eq!(&read.as_ref()[5..], data3);
959
960            // Read past end of blob
961            let result = blob.read_at(vec![0u8; 10], 10).await;
962            assert!(result.is_err());
963        });
964    }
965
966    fn test_blob_resize<R: Runner>(runner: R)
967    where
968        R::Context: Storage,
969    {
970        runner.start(|context| async move {
971            let partition = "test_partition_resize";
972            let name = b"test_blob_resize";
973
974            // Open and write to a new blob
975            let (blob, _) = context
976                .open(partition, name)
977                .await
978                .expect("Failed to open blob");
979
980            let data = b"some data";
981            blob.write_at(data.to_vec(), 0)
982                .await
983                .expect("Failed to write");
984            blob.sync().await.expect("Failed to sync after write");
985
986            // Re-open and check length
987            let (blob, len) = context.open(partition, name).await.unwrap();
988            assert_eq!(len, data.len() as u64);
989
990            // Resize to extend the file
991            let new_len = (data.len() as u64) * 2;
992            blob.resize(new_len)
993                .await
994                .expect("Failed to resize to extend");
995            blob.sync().await.expect("Failed to sync after resize");
996
997            // Re-open and check length again
998            let (blob, len) = context.open(partition, name).await.unwrap();
999            assert_eq!(len, new_len);
1000
1001            // Read original data
1002            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1003            assert_eq!(read_buf.as_ref(), data);
1004
1005            // Read extended part (should be zeros)
1006            let extended_part = blob
1007                .read_at(vec![0; data.len()], data.len() as u64)
1008                .await
1009                .unwrap();
1010            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1011
1012            // Truncate the blob
1013            blob.resize(data.len() as u64).await.unwrap();
1014            blob.sync().await.unwrap();
1015
1016            // Reopen to check truncation
1017            let (blob, size) = context.open(partition, name).await.unwrap();
1018            assert_eq!(size, data.len() as u64);
1019
1020            // Read truncated data
1021            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1022            assert_eq!(read_buf.as_ref(), data);
1023            blob.sync().await.unwrap();
1024        });
1025    }
1026
1027    fn test_many_partition_read_write<R: Runner>(runner: R)
1028    where
1029        R::Context: Storage,
1030    {
1031        runner.start(|context| async move {
1032            let partitions = ["partition1", "partition2", "partition3"];
1033            let name = b"test_blob_rw";
1034            let data1 = b"Hello";
1035            let data2 = b"World";
1036
1037            for (additional, partition) in partitions.iter().enumerate() {
1038                // Open a new blob
1039                let (blob, _) = context
1040                    .open(partition, name)
1041                    .await
1042                    .expect("Failed to open blob");
1043
1044                // Write data at different offsets
1045                blob.write_at(Vec::from(data1), 0)
1046                    .await
1047                    .expect("Failed to write data1");
1048                blob.write_at(Vec::from(data2), 5 + additional as u64)
1049                    .await
1050                    .expect("Failed to write data2");
1051
1052                // Sync the blob
1053                blob.sync().await.expect("Failed to sync blob");
1054            }
1055
1056            for (additional, partition) in partitions.iter().enumerate() {
1057                // Open a new blob
1058                let (blob, len) = context
1059                    .open(partition, name)
1060                    .await
1061                    .expect("Failed to open blob");
1062                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1063
1064                // Read data back
1065                let read = blob
1066                    .read_at(vec![0u8; 10 + additional], 0)
1067                    .await
1068                    .expect("Failed to read data");
1069                assert_eq!(&read.as_ref()[..5], b"Hello");
1070                assert_eq!(&read.as_ref()[5 + additional..], b"World");
1071            }
1072        });
1073    }
1074
1075    fn test_blob_read_past_length<R: Runner>(runner: R)
1076    where
1077        R::Context: Storage,
1078    {
1079        runner.start(|context| async move {
1080            let partition = "test_partition";
1081            let name = b"test_blob_rw";
1082
1083            // Open a new blob
1084            let (blob, _) = context
1085                .open(partition, name)
1086                .await
1087                .expect("Failed to open blob");
1088
1089            // Read data past file length (empty file)
1090            let result = blob.read_at(vec![0u8; 10], 0).await;
1091            assert!(result.is_err());
1092
1093            // Write data to the blob
1094            let data = b"Hello, Storage!".to_vec();
1095            blob.write_at(data, 0)
1096                .await
1097                .expect("Failed to write to blob");
1098
1099            // Read data past file length (non-empty file)
1100            let result = blob.read_at(vec![0u8; 20], 0).await;
1101            assert!(result.is_err());
1102        })
1103    }
1104
1105    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1106    where
1107        R::Context: Spawner + Storage + Metrics,
1108    {
1109        runner.start(|context| async move {
1110            let partition = "test_partition";
1111            let name = b"test_blob_rw";
1112
1113            // Open a new blob
1114            let (blob, _) = context
1115                .open(partition, name)
1116                .await
1117                .expect("Failed to open blob");
1118
1119            // Write data to the blob
1120            let data = b"Hello, Storage!";
1121            blob.write_at(Vec::from(data), 0)
1122                .await
1123                .expect("Failed to write to blob");
1124
1125            // Sync the blob
1126            blob.sync().await.expect("Failed to sync blob");
1127
1128            // Read data from the blob in clone
1129            let check1 = context.with_label("check1").spawn({
1130                let blob = blob.clone();
1131                move |_| async move {
1132                    let read = blob
1133                        .read_at(vec![0u8; data.len()], 0)
1134                        .await
1135                        .expect("Failed to read from blob");
1136                    assert_eq!(read.as_ref(), data);
1137                }
1138            });
1139            let check2 = context.with_label("check2").spawn({
1140                let blob = blob.clone();
1141                move |_| async move {
1142                    let read = blob
1143                        .read_at(vec![0; data.len()], 0)
1144                        .await
1145                        .expect("Failed to read from blob");
1146                    assert_eq!(read.as_ref(), data);
1147                }
1148            });
1149
1150            // Wait for both reads to complete
1151            let result = join!(check1, check2);
1152            assert!(result.0.is_ok());
1153            assert!(result.1.is_ok());
1154
1155            // Read data from the blob
1156            let read = blob
1157                .read_at(vec![0; data.len()], 0)
1158                .await
1159                .expect("Failed to read from blob");
1160            assert_eq!(read.as_ref(), data);
1161
1162            // Drop the blob
1163            drop(blob);
1164
1165            // Ensure no blobs still open
1166            let buffer = context.encode();
1167            assert!(buffer.contains("open_blobs 0"));
1168        });
1169    }
1170
1171    fn test_shutdown<R: Runner>(runner: R)
1172    where
1173        R::Context: Spawner + Metrics + Clock,
1174    {
1175        let kill = 9;
1176        runner.start(|context| async move {
1177            // Spawn a task that waits for signal
1178            let before = context
1179                .with_label("before")
1180                .spawn(move |context| async move {
1181                    let mut signal = context.stopped();
1182                    let value = (&mut signal).await.unwrap();
1183                    assert_eq!(value, kill);
1184                    drop(signal);
1185                });
1186
1187            // Signal the tasks and wait for them to stop
1188            let result = context.clone().stop(kill, None).await;
1189            assert!(result.is_ok());
1190
1191            // Spawn a task after stop is called
1192            let after = context
1193                .with_label("after")
1194                .spawn(move |context| async move {
1195                    // A call to `stopped()` after `stop()` resolves immediately
1196                    let value = context.stopped().await.unwrap();
1197                    assert_eq!(value, kill);
1198                });
1199
1200            // Ensure both tasks complete
1201            let result = join!(before, after);
1202            assert!(result.0.is_ok());
1203            assert!(result.1.is_ok());
1204        });
1205    }
1206
1207    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1208    where
1209        R::Context: Spawner + Metrics + Clock,
1210    {
1211        let kill = 42;
1212        runner.start(|context| async move {
1213            let (started_tx, mut started_rx) = mpsc::channel(3);
1214            let counter = Arc::new(AtomicU32::new(0));
1215
1216            // Spawn 3 tasks that do cleanup work after receiving stop signal
1217            // and increment a shared counter
1218            let task = |cleanup_duration: Duration| {
1219                let context = context.clone();
1220                let counter = counter.clone();
1221                let mut started_tx = started_tx.clone();
1222                context.spawn(move |context| async move {
1223                    // Wait for signal to be acquired
1224                    let mut signal = context.stopped();
1225                    started_tx.send(()).await.unwrap();
1226
1227                    // Increment once killed
1228                    let value = (&mut signal).await.unwrap();
1229                    assert_eq!(value, kill);
1230                    context.sleep(cleanup_duration).await;
1231                    counter.fetch_add(1, Ordering::SeqCst);
1232
1233                    // Wait to drop signal until work has been done
1234                    drop(signal);
1235                })
1236            };
1237
1238            let task1 = task(Duration::from_millis(10));
1239            let task2 = task(Duration::from_millis(20));
1240            let task3 = task(Duration::from_millis(30));
1241
1242            // Give tasks time to start
1243            for _ in 0..3 {
1244                started_rx.next().await.unwrap();
1245            }
1246
1247            // Stop and verify all cleanup completed
1248            context.stop(kill, None).await.unwrap();
1249            assert_eq!(counter.load(Ordering::SeqCst), 3);
1250
1251            // Ensure all tasks completed
1252            let result = join!(task1, task2, task3);
1253            assert!(result.0.is_ok());
1254            assert!(result.1.is_ok());
1255            assert!(result.2.is_ok());
1256        });
1257    }
1258
1259    fn test_shutdown_timeout<R: Runner>(runner: R)
1260    where
1261        R::Context: Spawner + Metrics + Clock,
1262    {
1263        let kill = 42;
1264        runner.start(|context| async move {
1265            // Setup startup coordinator
1266            let (started_tx, started_rx) = oneshot::channel();
1267
1268            // Spawn a task that never completes its cleanup
1269            context.clone().spawn(move |context| async move {
1270                let signal = context.stopped();
1271                started_tx.send(()).unwrap();
1272                pending::<()>().await;
1273                signal.await.unwrap();
1274            });
1275
1276            // Try to stop with a timeout
1277            started_rx.await.unwrap();
1278            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1279
1280            // Assert that we got a timeout error
1281            assert!(matches!(result, Err(Error::Timeout)));
1282        });
1283    }
1284
1285    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1286    where
1287        R::Context: Spawner + Metrics + Clock,
1288    {
1289        let kill1 = 42;
1290        let kill2 = 43;
1291
1292        runner.start(|context| async move {
1293            let (started_tx, started_rx) = oneshot::channel();
1294            let counter = Arc::new(AtomicU32::new(0));
1295
1296            // Spawn a task that delays completion to test timing
1297            let task = context.with_label("blocking_task").spawn({
1298                let counter = counter.clone();
1299                move |context| async move {
1300                    // Wait for signal to be acquired
1301                    let mut signal = context.stopped();
1302                    started_tx.send(()).unwrap();
1303
1304                    // Wait for signal to be resolved
1305                    let value = (&mut signal).await.unwrap();
1306                    assert_eq!(value, kill1);
1307                    context.sleep(Duration::from_millis(50)).await;
1308
1309                    // Increment counter
1310                    counter.fetch_add(1, Ordering::SeqCst);
1311                    drop(signal);
1312                }
1313            });
1314
1315            // Give task time to start
1316            started_rx.await.unwrap();
1317
1318            // Issue two separate stop calls
1319            // The second stop call uses a different stop value that should be ignored
1320            let stop_task1 = context.clone().stop(kill1, None);
1321            pin_mut!(stop_task1);
1322            let stop_task2 = context.clone().stop(kill2, None);
1323            pin_mut!(stop_task2);
1324
1325            // Both of them should be awaiting completion
1326            assert!(stop_task1.as_mut().now_or_never().is_none());
1327            assert!(stop_task2.as_mut().now_or_never().is_none());
1328
1329            // Wait for both stop calls to complete
1330            assert!(stop_task1.await.is_ok());
1331            assert!(stop_task2.await.is_ok());
1332
1333            // Verify first stop value wins
1334            let sig = context.stopped().await;
1335            assert_eq!(sig.unwrap(), kill1);
1336
1337            // Wait for blocking task to complete
1338            let result = task.await;
1339            assert!(result.is_ok());
1340            assert_eq!(counter.load(Ordering::SeqCst), 1);
1341
1342            // Post-completion stop should return immediately
1343            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1344        });
1345    }
1346
1347    fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1348    where
1349        R::Context: Spawner + Metrics,
1350    {
1351        runner.start(|context| async move {
1352            // Spawn a task that waits for signal
1353            context
1354                .with_label("before")
1355                .spawn(move |context| async move {
1356                    let mut signal = context.stopped();
1357                    let value = (&mut signal).await.unwrap();
1358
1359                    // We should never reach this point
1360                    assert_eq!(value, 42);
1361                    drop(signal);
1362                });
1363
1364            // Ensure waker is registered
1365            reschedule().await;
1366        });
1367    }
1368
1369    fn test_spawn_dedicated<R: Runner>(runner: R)
1370    where
1371        R::Context: Spawner,
1372    {
1373        runner.start(|context| async move {
1374            let handle = context.dedicated().spawn(|_| async move { 42 });
1375            assert!(matches!(handle.await, Ok(42)));
1376        });
1377    }
1378
1379    fn test_spawn<R: Runner>(runner: R)
1380    where
1381        R::Context: Spawner + Clock,
1382    {
1383        runner.start(|context| async move {
1384            let child_handle = Arc::new(Mutex::new(None));
1385            let child_handle2 = child_handle.clone();
1386
1387            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1388            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1389            let parent_handle = context.spawn(move |context| async move {
1390                // Spawn child that completes immediately
1391                let handle = context.spawn(|_| async {});
1392
1393                // Store child handle so we can test it later
1394                *child_handle2.lock().unwrap() = Some(handle);
1395
1396                parent_initialized_tx.send(()).unwrap();
1397
1398                // Parent task completes
1399                parent_complete_rx.await.unwrap();
1400            });
1401
1402            // Wait for parent task to spawn the children
1403            parent_initialized_rx.await.unwrap();
1404
1405            // Child task completes successfully
1406            let child_handle = child_handle.lock().unwrap().take().unwrap();
1407            assert!(child_handle.await.is_ok());
1408
1409            // Complete the parent task
1410            parent_complete_tx.send(()).unwrap();
1411
1412            // Wait for parent task to complete successfully
1413            assert!(parent_handle.await.is_ok());
1414        });
1415    }
1416
1417    fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1418    where
1419        R::Context: Spawner + Clock,
1420    {
1421        runner.start(|context| async move {
1422            let child_handle = Arc::new(Mutex::new(None));
1423            let child_handle2 = child_handle.clone();
1424
1425            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1426            let parent_handle = context.spawn(move |context| async move {
1427                // Spawn child task that hangs forever, should be aborted when parent aborts
1428                let handle = context.spawn(|_| pending::<()>());
1429
1430                // Store child task handle so we can test it later
1431                *child_handle2.lock().unwrap() = Some(handle);
1432
1433                parent_initialized_tx.send(()).unwrap();
1434
1435                // Parent task runs until aborted
1436                pending::<()>().await
1437            });
1438
1439            // Wait for parent task to spawn the children
1440            parent_initialized_rx.await.unwrap();
1441
1442            // Abort parent task
1443            parent_handle.abort();
1444            assert!(matches!(parent_handle.await, Err(Error::Closed)));
1445
1446            // Child task should also resolve with error since its parent aborted
1447            let child_handle = child_handle.lock().unwrap().take().unwrap();
1448            assert!(matches!(child_handle.await, Err(Error::Closed)));
1449        });
1450    }
1451
1452    fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1453    where
1454        R::Context: Spawner + Clock,
1455    {
1456        runner.start(|context| async move {
1457            let child_handle = Arc::new(Mutex::new(None));
1458            let child_handle2 = child_handle.clone();
1459
1460            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1461            let parent_handle = context.spawn(move |context| async move {
1462                // Spawn child task that hangs forever, should be aborted when parent completes
1463                let handle = context.spawn(|_| pending::<()>());
1464
1465                // Store child task handle so we can test it later
1466                *child_handle2.lock().unwrap() = Some(handle);
1467
1468                // Parent task completes
1469                parent_complete_rx.await.unwrap();
1470            });
1471
1472            // Fire parent completion
1473            parent_complete_tx.send(()).unwrap();
1474
1475            // Wait for parent task to complete
1476            assert!(parent_handle.await.is_ok());
1477
1478            // Child task should resolve with error since its parent has completed
1479            let child_handle = child_handle.lock().unwrap().take().unwrap();
1480            assert!(matches!(child_handle.await, Err(Error::Closed)));
1481        });
1482    }
1483
1484    fn test_spawn_cascading_abort<R: Runner>(runner: R)
1485    where
1486        R::Context: Spawner + Clock,
1487    {
1488        runner.start(|context| async move {
1489            // We create the following tree of tasks. All tasks will run
1490            // indefinitely (until aborted).
1491            //
1492            //          root
1493            //     /     |     \
1494            //    /      |      \
1495            //   c0      c1      c2
1496            //  /  \    /  \    /  \
1497            // g0  g1  g2  g3  g4  g5
1498            let c0 = context.clone();
1499            let g0 = c0.clone();
1500            let g1 = c0.clone();
1501            let c1 = context.clone();
1502            let g2 = c1.clone();
1503            let g3 = c1.clone();
1504            let c2 = context.clone();
1505            let g4 = c2.clone();
1506            let g5 = c2.clone();
1507
1508            // Spawn tasks
1509            let handles = Arc::new(Mutex::new(Vec::new()));
1510            let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1511            let root_task = context.spawn({
1512                let handles = handles.clone();
1513                move |_| async move {
1514                    for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1515                    {
1516                        let handle = context.spawn({
1517                            let handles = handles.clone();
1518                            let mut initialized_tx = initialized_tx.clone();
1519                            move |_| async move {
1520                                for grandchild in grandchildren {
1521                                    let handle = grandchild.spawn(|_| async {
1522                                        pending::<()>().await;
1523                                    });
1524                                    handles.lock().unwrap().push(handle);
1525                                    initialized_tx.send(()).await.unwrap();
1526                                }
1527
1528                                pending::<()>().await;
1529                            }
1530                        });
1531                        handles.lock().unwrap().push(handle);
1532                        initialized_tx.send(()).await.unwrap();
1533                    }
1534
1535                    pending::<()>().await;
1536                }
1537            });
1538
1539            // Wait for tasks to initialize
1540            for _ in 0..9 {
1541                initialized_rx.next().await.unwrap();
1542            }
1543
1544            // Verify we have all 9 handles (3 children + 6 grandchildren)
1545            assert_eq!(handles.lock().unwrap().len(), 9);
1546
1547            // Abort root task
1548            root_task.abort();
1549            assert!(matches!(root_task.await, Err(Error::Closed)));
1550
1551            // All handles should resolve with error due to cascading abort
1552            let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1553            for handle in handles {
1554                assert!(matches!(handle.await, Err(Error::Closed)));
1555            }
1556        });
1557    }
1558
1559    fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1560    where
1561        R::Context: Spawner + Clock,
1562    {
1563        runner.start(|context| async move {
1564            let (child_started_tx, child_started_rx) = oneshot::channel();
1565            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1566            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1567            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1568            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1569            let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1570            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1571
1572            let parent = context.spawn(move |context| async move {
1573                // Spawn a child task
1574                let child_handle = context.clone().spawn(|_| async move {
1575                    child_started_tx.send(()).unwrap();
1576                    // Wait for signal to complete
1577                    child_complete_rx.await.unwrap();
1578                });
1579                assert!(
1580                    child_handle_tx.send(child_handle).is_ok(),
1581                    "child handle receiver dropped"
1582                );
1583
1584                // Spawn an independent sibling task
1585                let sibling_handle = context.clone().spawn(move |_| async move {
1586                    sibling_started_tx.send(()).unwrap();
1587                    // Wait for signal to complete
1588                    sibling_complete_rx.await.unwrap();
1589                });
1590                assert!(
1591                    sibling_handle_tx.send(sibling_handle).is_ok(),
1592                    "sibling handle receiver dropped"
1593                );
1594
1595                // Wait for signal to complete
1596                parent_complete_rx.await.unwrap();
1597            });
1598
1599            // Wait for both to start
1600            child_started_rx.await.unwrap();
1601            sibling_started_rx.await.unwrap();
1602
1603            // Kill the sibling
1604            sibling_complete_tx.send(()).unwrap();
1605            assert!(sibling_handle_rx.await.is_ok());
1606
1607            // The child task should still be alive
1608            child_complete_tx.send(()).unwrap();
1609            assert!(child_handle_rx.await.is_ok());
1610
1611            // As well as the parent
1612            parent_complete_tx.send(()).unwrap();
1613            assert!(parent.await.is_ok());
1614        });
1615    }
1616
1617    fn test_spawn_clone_chain<R: Runner>(runner: R)
1618    where
1619        R::Context: Spawner + Clock,
1620    {
1621        runner.start(|context| async move {
1622            let (parent_started_tx, parent_started_rx) = oneshot::channel();
1623            let (child_started_tx, child_started_rx) = oneshot::channel();
1624            let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1625            let (child_handle_tx, child_handle_rx) = oneshot::channel();
1626            let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1627
1628            let parent = context.clone().spawn({
1629                move |context| async move {
1630                    let child = context.clone().spawn({
1631                        move |context| async move {
1632                            let grandchild = context.clone().spawn({
1633                                move |_| async move {
1634                                    grandchild_started_tx.send(()).unwrap();
1635                                    pending::<()>().await;
1636                                }
1637                            });
1638                            assert!(
1639                                grandchild_handle_tx.send(grandchild).is_ok(),
1640                                "grandchild handle receiver dropped"
1641                            );
1642                            child_started_tx.send(()).unwrap();
1643                            pending::<()>().await;
1644                        }
1645                    });
1646                    assert!(
1647                        child_handle_tx.send(child).is_ok(),
1648                        "child handle receiver dropped"
1649                    );
1650                    parent_started_tx.send(()).unwrap();
1651                    pending::<()>().await;
1652                }
1653            });
1654
1655            parent_started_rx.await.unwrap();
1656            child_started_rx.await.unwrap();
1657            grandchild_started_rx.await.unwrap();
1658
1659            let child_handle = child_handle_rx.await.unwrap();
1660            let grandchild_handle = grandchild_handle_rx.await.unwrap();
1661
1662            parent.abort();
1663            assert!(parent.await.is_err());
1664
1665            assert!(child_handle.await.is_err());
1666            assert!(grandchild_handle.await.is_err());
1667        });
1668    }
1669
1670    fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1671    where
1672        R::Context: Spawner + Clock,
1673    {
1674        runner.start(|context| async move {
1675            let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1676            let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1677
1678            let parent = context.clone().spawn({
1679                move |context| async move {
1680                    let clone1 = context.clone();
1681                    let clone2 = clone1.clone();
1682                    let clone3 = clone2.clone();
1683
1684                    let leaf = clone3.clone().spawn({
1685                        move |_| async move {
1686                            leaf_started_tx.send(()).unwrap();
1687                            pending::<()>().await;
1688                        }
1689                    });
1690
1691                    leaf_handle_tx
1692                        .send(leaf)
1693                        .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1694                    pending::<()>().await;
1695                }
1696            });
1697
1698            leaf_started_rx.await.unwrap();
1699            let leaf_handle = leaf_handle_rx.await.unwrap();
1700
1701            parent.abort();
1702            assert!(parent.await.is_err());
1703            assert!(leaf_handle.await.is_err());
1704        });
1705    }
1706
1707    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1708    where
1709        R::Context: Spawner,
1710    {
1711        runner.start(|context| async move {
1712            let context = if dedicated {
1713                context.dedicated()
1714            } else {
1715                context.shared(true)
1716            };
1717
1718            let handle = context.spawn(|_| async move { 42 });
1719            let result = handle.await;
1720            assert!(matches!(result, Ok(42)));
1721        });
1722    }
1723
1724    fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1725    where
1726        R::Context: Spawner + Clock,
1727    {
1728        runner.start(|context| async move {
1729            let context = if dedicated {
1730                context.dedicated()
1731            } else {
1732                context.shared(true)
1733            };
1734
1735            context.clone().spawn(|_| async move {
1736                panic!("blocking task panicked");
1737            });
1738
1739            // Loop until panic
1740            loop {
1741                context.sleep(Duration::from_millis(100)).await;
1742            }
1743        });
1744    }
1745
1746    fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1747    where
1748        R::Context: Spawner + Clock,
1749    {
1750        let result: Result<(), Error> = runner.start(|context| async move {
1751            let context = if dedicated {
1752                context.dedicated()
1753            } else {
1754                context.shared(true)
1755            };
1756
1757            let handle = context.clone().spawn(|_| async move {
1758                panic!("blocking task panicked");
1759            });
1760            handle.await
1761        });
1762        assert!(matches!(result, Err(Error::Exited)));
1763    }
1764
1765    fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1766        runner.start(|_| async move {
1767            // Setup tracked resource
1768            let dropper = Arc::new(());
1769            let executor = deterministic::Runner::default();
1770            executor.start({
1771                let dropper = dropper.clone();
1772                move |context| async move {
1773                    // Create tasks with circular dependencies through channels
1774                    let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1775                    let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1776                    let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1777
1778                    // Task 1 holds tx2 and waits on rx1
1779                    context.with_label("task1").spawn({
1780                        let mut setup_tx = setup_tx.clone();
1781                        let dropper = dropper.clone();
1782                        move |_| async move {
1783                            // Setup deadlock and mark ready
1784                            tx2.send(()).await.unwrap();
1785                            rx1.next().await.unwrap();
1786                            setup_tx.send(()).await.unwrap();
1787
1788                            // Wait forever
1789                            while rx1.next().await.is_some() {}
1790                            drop(tx2);
1791                            drop(dropper);
1792                        }
1793                    });
1794
1795                    // Task 2 holds tx1 and waits on rx2
1796                    context.with_label("task2").spawn(move |_| async move {
1797                        // Setup deadlock and mark ready
1798                        tx1.send(()).await.unwrap();
1799                        rx2.next().await.unwrap();
1800                        setup_tx.send(()).await.unwrap();
1801
1802                        // Wait forever
1803                        while rx2.next().await.is_some() {}
1804                        drop(tx1);
1805                        drop(dropper);
1806                    });
1807
1808                    // Wait for tasks to start
1809                    setup_rx.next().await.unwrap();
1810                    setup_rx.next().await.unwrap();
1811                }
1812            });
1813
1814            // After runtime drop, both tasks should be cleaned up
1815            Arc::try_unwrap(dropper).expect("references remaining");
1816        });
1817    }
1818
1819    fn test_late_waker<R: Runner>(runner: R)
1820    where
1821        R::Context: Metrics + Spawner,
1822    {
1823        // A future that captures its waker and sends it to the caller, then
1824        // stays pending forever.
1825        struct CaptureWaker {
1826            tx: Option<oneshot::Sender<Waker>>,
1827            sent: bool,
1828        }
1829        impl Future for CaptureWaker {
1830            type Output = ();
1831            fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1832                if !self.sent {
1833                    if let Some(tx) = self.tx.take() {
1834                        // Send a clone of the current task's waker to the root
1835                        let _ = tx.send(cx.waker().clone());
1836                    }
1837                    self.sent = true;
1838                }
1839                Poll::Pending
1840            }
1841        }
1842
1843        // A guard that wakes the captured waker on drop.
1844        struct WakeOnDrop(Option<Waker>);
1845        impl Drop for WakeOnDrop {
1846            fn drop(&mut self) {
1847                if let Some(w) = self.0.take() {
1848                    w.wake_by_ref();
1849                }
1850            }
1851        }
1852
1853        // Run the executor to completion
1854        let holder = runner.start(|context| async move {
1855            // Wire a oneshot to receive the task waker.
1856            let (tx, rx) = oneshot::channel::<Waker>();
1857
1858            // Spawn a task that registers its waker and then stays pending.
1859            context
1860                .with_label("capture-waker")
1861                .spawn(move |_| async move {
1862                    CaptureWaker {
1863                        tx: Some(tx),
1864                        sent: false,
1865                    }
1866                    .await;
1867                });
1868
1869            // Ensure the spawned task runs and registers its waker.
1870            utils::reschedule().await;
1871
1872            // Receive the waker from the spawned task.
1873            let waker = rx.await.expect("waker not received");
1874
1875            // Return a guard that will wake after the runtime has dropped.
1876            WakeOnDrop(Some(waker))
1877        });
1878
1879        // Dropping the guard after the runtime has torn down will trigger a wake on
1880        // a task whose executor has been dropped.
1881        drop(holder);
1882    }
1883
1884    fn test_metrics<R: Runner>(runner: R)
1885    where
1886        R::Context: Metrics,
1887    {
1888        runner.start(|context| async move {
1889            // Assert label
1890            assert_eq!(context.label(), "");
1891
1892            // Register a metric
1893            let counter = Counter::<u64>::default();
1894            context.register("test", "test", counter.clone());
1895
1896            // Increment the counter
1897            counter.inc();
1898
1899            // Encode metrics
1900            let buffer = context.encode();
1901            assert!(buffer.contains("test_total 1"));
1902
1903            // Nested context
1904            let context = context.with_label("nested");
1905            let nested_counter = Counter::<u64>::default();
1906            context.register("test", "test", nested_counter.clone());
1907
1908            // Increment the counter
1909            nested_counter.inc();
1910
1911            // Encode metrics
1912            let buffer = context.encode();
1913            assert!(buffer.contains("nested_test_total 1"));
1914            assert!(buffer.contains("test_total 1"));
1915        });
1916    }
1917
1918    fn test_metrics_label<R: Runner>(runner: R)
1919    where
1920        R::Context: Metrics,
1921    {
1922        runner.start(|context| async move {
1923            context.with_label(METRICS_PREFIX);
1924        })
1925    }
1926
1927    #[test]
1928    fn test_deterministic_future() {
1929        let runner = deterministic::Runner::default();
1930        test_error_future(runner);
1931    }
1932
1933    #[test]
1934    fn test_deterministic_clock_sleep() {
1935        let executor = deterministic::Runner::default();
1936        test_clock_sleep(executor);
1937    }
1938
1939    #[test]
1940    fn test_deterministic_clock_sleep_until() {
1941        let executor = deterministic::Runner::default();
1942        test_clock_sleep_until(executor);
1943    }
1944
1945    #[test]
1946    fn test_deterministic_clock_timeout() {
1947        let executor = deterministic::Runner::default();
1948        test_clock_timeout(executor);
1949    }
1950
1951    #[test]
1952    fn test_deterministic_root_finishes() {
1953        let executor = deterministic::Runner::default();
1954        test_root_finishes(executor);
1955    }
1956
1957    #[test]
1958    fn test_deterministic_spawn_after_abort() {
1959        let executor = deterministic::Runner::default();
1960        test_spawn_after_abort(executor);
1961    }
1962
1963    #[test]
1964    fn test_deterministic_spawn_abort() {
1965        let executor = deterministic::Runner::default();
1966        test_spawn_abort(executor, false, false);
1967    }
1968
1969    #[test]
1970    #[should_panic(expected = "blah")]
1971    fn test_deterministic_panic_aborts_root() {
1972        let runner = deterministic::Runner::default();
1973        test_panic_aborts_root(runner);
1974    }
1975
1976    #[test]
1977    #[should_panic(expected = "blah")]
1978    fn test_deterministic_panic_aborts_root_caught() {
1979        let cfg = deterministic::Config::default().with_catch_panics(true);
1980        let runner = deterministic::Runner::new(cfg);
1981        test_panic_aborts_root(runner);
1982    }
1983
1984    #[test]
1985    #[should_panic(expected = "blah")]
1986    fn test_deterministic_panic_aborts_spawn() {
1987        let executor = deterministic::Runner::default();
1988        test_panic_aborts_spawn(executor);
1989    }
1990
1991    #[test]
1992    fn test_deterministic_panic_aborts_spawn_caught() {
1993        let cfg = deterministic::Config::default().with_catch_panics(true);
1994        let executor = deterministic::Runner::new(cfg);
1995        test_panic_aborts_spawn_caught(executor);
1996    }
1997
1998    #[test]
1999    #[should_panic(expected = "boom")]
2000    fn test_deterministic_multiple_panics() {
2001        let executor = deterministic::Runner::default();
2002        test_multiple_panics(executor);
2003    }
2004
2005    #[test]
2006    fn test_deterministic_multiple_panics_caught() {
2007        let cfg = deterministic::Config::default().with_catch_panics(true);
2008        let executor = deterministic::Runner::new(cfg);
2009        test_multiple_panics_caught(executor);
2010    }
2011
2012    #[test]
2013    fn test_deterministic_select() {
2014        let executor = deterministic::Runner::default();
2015        test_select(executor);
2016    }
2017
2018    #[test]
2019    fn test_deterministic_select_loop() {
2020        let executor = deterministic::Runner::default();
2021        test_select_loop(executor);
2022    }
2023
2024    #[test]
2025    fn test_deterministic_storage_operations() {
2026        let executor = deterministic::Runner::default();
2027        test_storage_operations(executor);
2028    }
2029
2030    #[test]
2031    fn test_deterministic_blob_read_write() {
2032        let executor = deterministic::Runner::default();
2033        test_blob_read_write(executor);
2034    }
2035
2036    #[test]
2037    fn test_deterministic_blob_resize() {
2038        let executor = deterministic::Runner::default();
2039        test_blob_resize(executor);
2040    }
2041
2042    #[test]
2043    fn test_deterministic_many_partition_read_write() {
2044        let executor = deterministic::Runner::default();
2045        test_many_partition_read_write(executor);
2046    }
2047
2048    #[test]
2049    fn test_deterministic_blob_read_past_length() {
2050        let executor = deterministic::Runner::default();
2051        test_blob_read_past_length(executor);
2052    }
2053
2054    #[test]
2055    fn test_deterministic_blob_clone_and_concurrent_read() {
2056        // Run test
2057        let executor = deterministic::Runner::default();
2058        test_blob_clone_and_concurrent_read(executor);
2059    }
2060
2061    #[test]
2062    fn test_deterministic_shutdown() {
2063        let executor = deterministic::Runner::default();
2064        test_shutdown(executor);
2065    }
2066
2067    #[test]
2068    fn test_deterministic_shutdown_multiple_signals() {
2069        let executor = deterministic::Runner::default();
2070        test_shutdown_multiple_signals(executor);
2071    }
2072
2073    #[test]
2074    fn test_deterministic_shutdown_timeout() {
2075        let executor = deterministic::Runner::default();
2076        test_shutdown_timeout(executor);
2077    }
2078
2079    #[test]
2080    fn test_deterministic_shutdown_multiple_stop_calls() {
2081        let executor = deterministic::Runner::default();
2082        test_shutdown_multiple_stop_calls(executor);
2083    }
2084
2085    #[test]
2086    fn test_deterministic_unfulfilled_shutdown() {
2087        let executor = deterministic::Runner::default();
2088        test_unfulfilled_shutdown(executor);
2089    }
2090
2091    #[test]
2092    fn test_deterministic_spawn_dedicated() {
2093        let executor = deterministic::Runner::default();
2094        test_spawn_dedicated(executor);
2095    }
2096
2097    #[test]
2098    fn test_deterministic_spawn() {
2099        let runner = deterministic::Runner::default();
2100        test_spawn(runner);
2101    }
2102
2103    #[test]
2104    fn test_deterministic_spawn_abort_on_parent_abort() {
2105        let runner = deterministic::Runner::default();
2106        test_spawn_abort_on_parent_abort(runner);
2107    }
2108
2109    #[test]
2110    fn test_deterministic_spawn_abort_on_parent_completion() {
2111        let runner = deterministic::Runner::default();
2112        test_spawn_abort_on_parent_completion(runner);
2113    }
2114
2115    #[test]
2116    fn test_deterministic_spawn_cascading_abort() {
2117        let runner = deterministic::Runner::default();
2118        test_spawn_cascading_abort(runner);
2119    }
2120
2121    #[test]
2122    fn test_deterministic_child_survives_sibling_completion() {
2123        let runner = deterministic::Runner::default();
2124        test_child_survives_sibling_completion(runner);
2125    }
2126
2127    #[test]
2128    fn test_deterministic_spawn_clone_chain() {
2129        let runner = deterministic::Runner::default();
2130        test_spawn_clone_chain(runner);
2131    }
2132
2133    #[test]
2134    fn test_deterministic_spawn_sparse_clone_chain() {
2135        let runner = deterministic::Runner::default();
2136        test_spawn_sparse_clone_chain(runner);
2137    }
2138
2139    #[test]
2140    fn test_deterministic_spawn_blocking() {
2141        for dedicated in [false, true] {
2142            let executor = deterministic::Runner::default();
2143            test_spawn_blocking(executor, dedicated);
2144        }
2145    }
2146
2147    #[test]
2148    #[should_panic(expected = "blocking task panicked")]
2149    fn test_deterministic_spawn_blocking_panic() {
2150        for dedicated in [false, true] {
2151            let executor = deterministic::Runner::default();
2152            test_spawn_blocking_panic(executor, dedicated);
2153        }
2154    }
2155
2156    #[test]
2157    fn test_deterministic_spawn_blocking_panic_caught() {
2158        for dedicated in [false, true] {
2159            let cfg = deterministic::Config::default().with_catch_panics(true);
2160            let executor = deterministic::Runner::new(cfg);
2161            test_spawn_blocking_panic_caught(executor, dedicated);
2162        }
2163    }
2164
2165    #[test]
2166    fn test_deterministic_spawn_blocking_abort() {
2167        for (dedicated, blocking) in [(false, true), (true, false)] {
2168            let executor = deterministic::Runner::default();
2169            test_spawn_abort(executor, dedicated, blocking);
2170        }
2171    }
2172
2173    #[test]
2174    fn test_deterministic_circular_reference_prevents_cleanup() {
2175        let executor = deterministic::Runner::default();
2176        test_circular_reference_prevents_cleanup(executor);
2177    }
2178
2179    #[test]
2180    fn test_deterministic_late_waker() {
2181        let executor = deterministic::Runner::default();
2182        test_late_waker(executor);
2183    }
2184
2185    #[test]
2186    fn test_deterministic_metrics() {
2187        let executor = deterministic::Runner::default();
2188        test_metrics(executor);
2189    }
2190
2191    #[test]
2192    #[should_panic]
2193    fn test_deterministic_metrics_label() {
2194        let executor = deterministic::Runner::default();
2195        test_metrics_label(executor);
2196    }
2197
2198    #[test]
2199    fn test_tokio_error_future() {
2200        let runner = tokio::Runner::default();
2201        test_error_future(runner);
2202    }
2203
2204    #[test]
2205    fn test_tokio_clock_sleep() {
2206        let executor = tokio::Runner::default();
2207        test_clock_sleep(executor);
2208    }
2209
2210    #[test]
2211    fn test_tokio_clock_sleep_until() {
2212        let executor = tokio::Runner::default();
2213        test_clock_sleep_until(executor);
2214    }
2215
2216    #[test]
2217    fn test_tokio_clock_timeout() {
2218        let executor = tokio::Runner::default();
2219        test_clock_timeout(executor);
2220    }
2221
2222    #[test]
2223    fn test_tokio_root_finishes() {
2224        let executor = tokio::Runner::default();
2225        test_root_finishes(executor);
2226    }
2227
2228    #[test]
2229    fn test_tokio_spawn_after_abort() {
2230        let executor = tokio::Runner::default();
2231        test_spawn_after_abort(executor);
2232    }
2233
2234    #[test]
2235    fn test_tokio_spawn_abort() {
2236        let executor = tokio::Runner::default();
2237        test_spawn_abort(executor, false, false);
2238    }
2239
2240    #[test]
2241    #[should_panic(expected = "blah")]
2242    fn test_tokio_panic_aborts_root() {
2243        let executor = tokio::Runner::default();
2244        test_panic_aborts_root(executor);
2245    }
2246
2247    #[test]
2248    #[should_panic(expected = "blah")]
2249    fn test_tokio_panic_aborts_root_caught() {
2250        let cfg = tokio::Config::default().with_catch_panics(true);
2251        let executor = tokio::Runner::new(cfg);
2252        test_panic_aborts_root(executor);
2253    }
2254
2255    #[test]
2256    #[should_panic(expected = "blah")]
2257    fn test_tokio_panic_aborts_spawn() {
2258        let executor = tokio::Runner::default();
2259        test_panic_aborts_spawn(executor);
2260    }
2261
2262    #[test]
2263    fn test_tokio_panic_aborts_spawn_caught() {
2264        let cfg = tokio::Config::default().with_catch_panics(true);
2265        let executor = tokio::Runner::new(cfg);
2266        test_panic_aborts_spawn_caught(executor);
2267    }
2268
2269    #[test]
2270    #[should_panic(expected = "boom")]
2271    fn test_tokio_multiple_panics() {
2272        let executor = tokio::Runner::default();
2273        test_multiple_panics(executor);
2274    }
2275
2276    #[test]
2277    fn test_tokio_multiple_panics_caught() {
2278        let cfg = tokio::Config::default().with_catch_panics(true);
2279        let executor = tokio::Runner::new(cfg);
2280        test_multiple_panics_caught(executor);
2281    }
2282
2283    #[test]
2284    fn test_tokio_select() {
2285        let executor = tokio::Runner::default();
2286        test_select(executor);
2287    }
2288
2289    #[test]
2290    fn test_tokio_select_loop() {
2291        let executor = tokio::Runner::default();
2292        test_select_loop(executor);
2293    }
2294
2295    #[test]
2296    fn test_tokio_storage_operations() {
2297        let executor = tokio::Runner::default();
2298        test_storage_operations(executor);
2299    }
2300
2301    #[test]
2302    fn test_tokio_blob_read_write() {
2303        let executor = tokio::Runner::default();
2304        test_blob_read_write(executor);
2305    }
2306
2307    #[test]
2308    fn test_tokio_blob_resize() {
2309        let executor = tokio::Runner::default();
2310        test_blob_resize(executor);
2311    }
2312
2313    #[test]
2314    fn test_tokio_many_partition_read_write() {
2315        let executor = tokio::Runner::default();
2316        test_many_partition_read_write(executor);
2317    }
2318
2319    #[test]
2320    fn test_tokio_blob_read_past_length() {
2321        let executor = tokio::Runner::default();
2322        test_blob_read_past_length(executor);
2323    }
2324
2325    #[test]
2326    fn test_tokio_blob_clone_and_concurrent_read() {
2327        // Run test
2328        let executor = tokio::Runner::default();
2329        test_blob_clone_and_concurrent_read(executor);
2330    }
2331
2332    #[test]
2333    fn test_tokio_shutdown() {
2334        let executor = tokio::Runner::default();
2335        test_shutdown(executor);
2336    }
2337
2338    #[test]
2339    fn test_tokio_shutdown_multiple_signals() {
2340        let executor = tokio::Runner::default();
2341        test_shutdown_multiple_signals(executor);
2342    }
2343
2344    #[test]
2345    fn test_tokio_shutdown_timeout() {
2346        let executor = tokio::Runner::default();
2347        test_shutdown_timeout(executor);
2348    }
2349
2350    #[test]
2351    fn test_tokio_shutdown_multiple_stop_calls() {
2352        let executor = tokio::Runner::default();
2353        test_shutdown_multiple_stop_calls(executor);
2354    }
2355
2356    #[test]
2357    fn test_tokio_unfulfilled_shutdown() {
2358        let executor = tokio::Runner::default();
2359        test_unfulfilled_shutdown(executor);
2360    }
2361
2362    #[test]
2363    fn test_tokio_spawn_dedicated() {
2364        let executor = tokio::Runner::default();
2365        test_spawn_dedicated(executor);
2366    }
2367
2368    #[test]
2369    fn test_tokio_spawn() {
2370        let runner = tokio::Runner::default();
2371        test_spawn(runner);
2372    }
2373
2374    #[test]
2375    fn test_tokio_spawn_abort_on_parent_abort() {
2376        let runner = tokio::Runner::default();
2377        test_spawn_abort_on_parent_abort(runner);
2378    }
2379
2380    #[test]
2381    fn test_tokio_spawn_abort_on_parent_completion() {
2382        let runner = tokio::Runner::default();
2383        test_spawn_abort_on_parent_completion(runner);
2384    }
2385
2386    #[test]
2387    fn test_tokio_spawn_cascading_abort() {
2388        let runner = tokio::Runner::default();
2389        test_spawn_cascading_abort(runner);
2390    }
2391
2392    #[test]
2393    fn test_tokio_child_survives_sibling_completion() {
2394        let runner = tokio::Runner::default();
2395        test_child_survives_sibling_completion(runner);
2396    }
2397
2398    #[test]
2399    fn test_tokio_spawn_clone_chain() {
2400        let runner = tokio::Runner::default();
2401        test_spawn_clone_chain(runner);
2402    }
2403
2404    #[test]
2405    fn test_tokio_spawn_sparse_clone_chain() {
2406        let runner = tokio::Runner::default();
2407        test_spawn_sparse_clone_chain(runner);
2408    }
2409
2410    #[test]
2411    fn test_tokio_spawn_blocking() {
2412        for dedicated in [false, true] {
2413            let executor = tokio::Runner::default();
2414            test_spawn_blocking(executor, dedicated);
2415        }
2416    }
2417
2418    #[test]
2419    #[should_panic(expected = "blocking task panicked")]
2420    fn test_tokio_spawn_blocking_panic() {
2421        for dedicated in [false, true] {
2422            let executor = tokio::Runner::default();
2423            test_spawn_blocking_panic(executor, dedicated);
2424        }
2425    }
2426
2427    #[test]
2428    fn test_tokio_spawn_blocking_panic_caught() {
2429        for dedicated in [false, true] {
2430            let cfg = tokio::Config::default().with_catch_panics(true);
2431            let executor = tokio::Runner::new(cfg);
2432            test_spawn_blocking_panic_caught(executor, dedicated);
2433        }
2434    }
2435
2436    #[test]
2437    fn test_tokio_spawn_blocking_abort() {
2438        for (dedicated, blocking) in [(false, true), (true, false)] {
2439            let executor = tokio::Runner::default();
2440            test_spawn_abort(executor, dedicated, blocking);
2441        }
2442    }
2443
2444    #[test]
2445    fn test_tokio_circular_reference_prevents_cleanup() {
2446        let executor = tokio::Runner::default();
2447        test_circular_reference_prevents_cleanup(executor);
2448    }
2449
2450    #[test]
2451    fn test_tokio_late_waker() {
2452        let executor = tokio::Runner::default();
2453        test_late_waker(executor);
2454    }
2455
2456    #[test]
2457    fn test_tokio_metrics() {
2458        let executor = tokio::Runner::default();
2459        test_metrics(executor);
2460    }
2461
2462    #[test]
2463    #[should_panic]
2464    fn test_tokio_metrics_label() {
2465        let executor = tokio::Runner::default();
2466        test_metrics_label(executor);
2467    }
2468
2469    #[test]
2470    fn test_tokio_process_rss_metric() {
2471        let executor = tokio::Runner::default();
2472        executor.start(|context| async move {
2473            loop {
2474                // Wait for RSS metric to be available
2475                let metrics = context.encode();
2476                if !metrics.contains("runtime_process_rss") {
2477                    context.sleep(Duration::from_millis(100)).await;
2478                    continue;
2479                }
2480
2481                // Verify the RSS value is eventually populated (greater than 0)
2482                for line in metrics.lines() {
2483                    if line.starts_with("runtime_process_rss")
2484                        && !line.starts_with("runtime_process_rss{")
2485                    {
2486                        let parts: Vec<&str> = line.split_whitespace().collect();
2487                        if parts.len() >= 2 {
2488                            let rss_value: i64 =
2489                                parts[1].parse().expect("Failed to parse RSS value");
2490                            if rss_value > 0 {
2491                                return;
2492                            }
2493                        }
2494                    }
2495                }
2496            }
2497        });
2498    }
2499
2500    #[test]
2501    fn test_tokio_telemetry() {
2502        let executor = tokio::Runner::default();
2503        executor.start(|context| async move {
2504            // Define the server address
2505            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2506
2507            // Configure telemetry
2508            tokio::telemetry::init(
2509                context.with_label("metrics"),
2510                tokio::telemetry::Logging {
2511                    level: Level::INFO,
2512                    json: false,
2513                },
2514                Some(address),
2515                None,
2516            );
2517
2518            // Register a test metric
2519            let counter: Counter<u64> = Counter::default();
2520            context.register("test_counter", "Test counter", counter.clone());
2521            counter.inc();
2522
2523            // Helper functions to parse HTTP response
2524            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2525                let mut line = Vec::new();
2526                loop {
2527                    let byte = stream.recv(vec![0; 1]).await?;
2528                    if byte[0] == b'\n' {
2529                        if line.last() == Some(&b'\r') {
2530                            line.pop(); // Remove trailing \r
2531                        }
2532                        break;
2533                    }
2534                    line.push(byte[0]);
2535                }
2536                String::from_utf8(line).map_err(|_| Error::ReadFailed)
2537            }
2538
2539            async fn read_headers<St: Stream>(
2540                stream: &mut St,
2541            ) -> Result<HashMap<String, String>, Error> {
2542                let mut headers = HashMap::new();
2543                loop {
2544                    let line = read_line(stream).await?;
2545                    if line.is_empty() {
2546                        break;
2547                    }
2548                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
2549                    if parts.len() == 2 {
2550                        headers.insert(parts[0].to_string(), parts[1].to_string());
2551                    }
2552                }
2553                Ok(headers)
2554            }
2555
2556            async fn read_body<St: Stream>(
2557                stream: &mut St,
2558                content_length: usize,
2559            ) -> Result<String, Error> {
2560                let read = stream.recv(vec![0; content_length]).await?;
2561                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2562            }
2563
2564            // Simulate a client connecting to the server
2565            let client_handle = context
2566                .with_label("client")
2567                .spawn(move |context| async move {
2568                    let (mut sink, mut stream) = loop {
2569                        match context.dial(address).await {
2570                            Ok((sink, stream)) => break (sink, stream),
2571                            Err(e) => {
2572                                // The client may be polled before the server is ready, that's alright!
2573                                error!(err =?e, "failed to connect");
2574                                context.sleep(Duration::from_millis(10)).await;
2575                            }
2576                        }
2577                    };
2578
2579                    // Send a GET request to the server
2580                    let request = format!(
2581                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2582                    );
2583                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
2584
2585                    // Read and verify the HTTP status line
2586                    let status_line = read_line(&mut stream).await.unwrap();
2587                    assert_eq!(status_line, "HTTP/1.1 200 OK");
2588
2589                    // Read and parse headers
2590                    let headers = read_headers(&mut stream).await.unwrap();
2591                    println!("Headers: {headers:?}");
2592                    let content_length = headers
2593                        .get("content-length")
2594                        .unwrap()
2595                        .parse::<usize>()
2596                        .unwrap();
2597
2598                    // Read and verify the body
2599                    let body = read_body(&mut stream, content_length).await.unwrap();
2600                    assert!(body.contains("test_counter_total 1"));
2601                });
2602
2603            // Wait for the client task to complete
2604            client_handle.await.unwrap();
2605        });
2606    }
2607
2608    #[test_collect_traces]
2609    fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2610        let executor = deterministic::Runner::new(deterministic::Config::default());
2611        executor.start(|context| async move {
2612            context
2613                .with_label("test")
2614                .instrumented()
2615                .spawn(|context| async move {
2616                    tracing::info!(field = "test field", "test log");
2617
2618                    context
2619                        .with_label("inner")
2620                        .instrumented()
2621                        .spawn(|_| async move {
2622                            tracing::info!("inner log");
2623                        })
2624                        .await
2625                        .unwrap();
2626                })
2627                .await
2628                .unwrap();
2629        });
2630
2631        let info_traces = traces.get_by_level(Level::INFO);
2632        assert_eq!(info_traces.len(), 2);
2633
2634        // Outer log (single span)
2635        info_traces
2636            .expect_event_at_index(0, |event| {
2637                event.metadata.expect_content_exact("test log")?;
2638                event.metadata.expect_field_count(1)?;
2639                event.metadata.expect_field_exact("field", "test field")?;
2640                event.expect_span_count(1)?;
2641                event.expect_span_at_index(0, |span| {
2642                    span.expect_content_exact("task")?;
2643                    span.expect_field_count(1)?;
2644                    span.expect_field_exact("name", "test")
2645                })
2646            })
2647            .unwrap();
2648
2649        info_traces
2650            .expect_event_at_index(1, |event| {
2651                event.metadata.expect_content_exact("inner log")?;
2652                event.metadata.expect_field_count(0)?;
2653                event.expect_span_count(1)?;
2654                event.expect_span_at_index(0, |span| {
2655                    span.expect_content_exact("task")?;
2656                    span.expect_field_count(1)?;
2657                    span.expect_field_exact("name", "test_inner")
2658                })
2659            })
2660            .unwrap();
2661    }
2662}