commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20#![doc(
21    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22    html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_utils::StableBuf;
26use prometheus_client::registry::Metric;
27use std::{
28    future::Future,
29    io::Error as IoError,
30    net::SocketAddr,
31    time::{Duration, SystemTime},
32};
33use thiserror::Error;
34
35#[macro_use]
36mod macros;
37
38pub mod deterministic;
39pub mod mocks;
40cfg_if::cfg_if! {
41    if #[cfg(not(target_arch = "wasm32"))] {
42        pub mod tokio;
43        pub mod benchmarks;
44    }
45}
46mod network;
47mod process;
48mod storage;
49pub mod telemetry;
50mod utils;
51pub use utils::*;
52#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
53mod iouring;
54
55/// Prefix for runtime metrics.
56const METRICS_PREFIX: &str = "runtime";
57
58/// Errors that can occur when interacting with the runtime.
59#[derive(Error, Debug)]
60pub enum Error {
61    #[error("exited")]
62    Exited,
63    #[error("closed")]
64    Closed,
65    #[error("timeout")]
66    Timeout,
67    #[error("bind failed")]
68    BindFailed,
69    #[error("connection failed")]
70    ConnectionFailed,
71    #[error("write failed")]
72    WriteFailed,
73    #[error("read failed")]
74    ReadFailed,
75    #[error("send failed")]
76    SendFailed,
77    #[error("recv failed")]
78    RecvFailed,
79    #[error("partition creation failed: {0}")]
80    PartitionCreationFailed(String),
81    #[error("partition missing: {0}")]
82    PartitionMissing(String),
83    #[error("partition corrupt: {0}")]
84    PartitionCorrupt(String),
85    #[error("blob open failed: {0}/{1} error: {2}")]
86    BlobOpenFailed(String, String, IoError),
87    #[error("blob missing: {0}/{1}")]
88    BlobMissing(String, String),
89    #[error("blob resize failed: {0}/{1} error: {2}")]
90    BlobResizeFailed(String, String, IoError),
91    #[error("blob sync failed: {0}/{1} error: {2}")]
92    BlobSyncFailed(String, String, IoError),
93    #[error("blob insufficient length")]
94    BlobInsufficientLength,
95    #[error("offset overflow")]
96    OffsetOverflow,
97    #[error("io error: {0}")]
98    Io(#[from] IoError),
99}
100
101/// Interface that any task scheduler must implement to start
102/// running tasks.
103pub trait Runner {
104    /// Context defines the environment available to tasks.
105    /// Example of possible services provided by the context include:
106    /// - [Clock] for time-based operations
107    /// - [Network] for network operations
108    /// - [Storage] for storage operations
109    type Context;
110
111    /// Start running a root task.
112    fn start<F, Fut>(self, f: F) -> Fut::Output
113    where
114        F: FnOnce(Self::Context) -> Fut,
115        Fut: Future;
116}
117
118/// Interface that any task scheduler must implement to spawn tasks.
119pub trait Spawner: Clone + Send + Sync + 'static {
120    /// Enqueue a task to be executed.
121    ///
122    /// Unlike a future, a spawned task will start executing immediately (even if the caller
123    /// does not await the handle).
124    ///
125    /// Spawned tasks consume the context used to create them. This ensures that context cannot
126    /// be shared between tasks and that a task's context always comes from somewhere.
127    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
128    where
129        F: FnOnce(Self) -> Fut + Send + 'static,
130        Fut: Future<Output = T> + Send + 'static,
131        T: Send + 'static;
132
133    /// Enqueue a task to be executed (without consuming the context).
134    ///
135    /// The semantics are the same as [Spawner::spawn].
136    ///
137    /// # Warning
138    ///
139    /// If this function is used to spawn multiple tasks from the same context (including child
140    /// tasks), the runtime will panic to prevent accidental misuse.
141    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
142    where
143        F: Future<Output = T> + Send + 'static,
144        T: Send + 'static;
145
146    /// Enqueue a child task to be executed that will be automatically aborted when the
147    /// parent task completes or is aborted.
148    ///
149    /// The spawned task will be tracked as a child of the current task. When
150    /// the parent task completes (either successfully or via abort), all child
151    /// tasks will be automatically aborted.
152    ///
153    /// # Context cloning and children
154    ///
155    /// When a context is cloned (via `Clone::clone`) or a new context is created (via methods
156    /// like `with_label`), you get another reference to the same context. However:
157    /// - Tasks spawned with `spawn` from any context (original or cloned) are always independent
158    /// - Only tasks spawned with `spawn_child` become children of the current task
159    /// - Child tasks are tied to the task that spawned them, not to the context itself
160    ///
161    /// # Note
162    ///
163    /// Only async tasks can be spawned as children, since blocking tasks cannot be
164    /// aborted and therefore can't support parent-child relationships.
165    fn spawn_child<F, Fut, T>(self, f: F) -> Handle<T>
166    where
167        F: FnOnce(Self) -> Fut + Send + 'static,
168        Fut: Future<Output = T> + Send + 'static,
169        T: Send + 'static;
170
171    /// Enqueue a blocking task to be executed.
172    ///
173    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
174    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
175    /// thread (tasks that are expected to run indefinitely).
176    ///
177    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
178    /// result.
179    ///
180    /// # Motivation
181    ///
182    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
183    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
184    /// of said runtimes.
185    ///
186    /// # Warning
187    ///
188    /// Blocking tasks cannot be aborted.
189    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
190    where
191        F: FnOnce(Self) -> T + Send + 'static,
192        T: Send + 'static;
193
194    /// Enqueue a blocking task to be executed (without consuming the context).
195    ///
196    /// The semantics are the same as [Spawner::spawn_blocking].
197    ///
198    /// # Warning
199    ///
200    /// If this function is used to spawn multiple tasks from the same context,
201    /// the runtime will panic to prevent accidental misuse.
202    fn spawn_blocking_ref<F, T>(
203        &mut self,
204        dedicated: bool,
205    ) -> impl FnOnce(F) -> Handle<T> + 'static
206    where
207        F: FnOnce() -> T + Send + 'static,
208        T: Send + 'static;
209
210    /// Signals the runtime to stop execution and waits for all outstanding tasks
211    /// to perform any required cleanup and exit.
212    ///
213    /// This method does not actually kill any tasks but rather signals to them, using
214    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
215    /// It then waits for all [signal::Signal] references to be dropped before returning.
216    ///
217    /// ## Multiple Stop Calls
218    ///
219    /// This method is idempotent and safe to call multiple times concurrently (on
220    /// different instances of the same context since it consumes `self`). The first
221    /// call initiates shutdown with the provided `value`, and all subsequent calls
222    /// will wait for the same completion regardless of their `value` parameter, i.e.
223    /// the original `value` from the first call is preserved.
224    ///
225    /// ## Timeout
226    ///
227    /// If a timeout is provided, the method will return an error if all [signal::Signal]
228    /// references have not been dropped within the specified duration.
229    fn stop(
230        self,
231        value: i32,
232        timeout: Option<Duration>,
233    ) -> impl Future<Output = Result<(), Error>> + Send;
234
235    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
236    /// any task.
237    ///
238    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
239    /// immediately. The [signal::Signal] returned will always resolve to the value of the
240    /// first [Spawner::stop] call.
241    fn stopped(&self) -> signal::Signal;
242}
243
244/// Interface to register and encode metrics.
245pub trait Metrics: Clone + Send + Sync + 'static {
246    /// Get the current label of the context.
247    fn label(&self) -> String;
248
249    /// Create a new instance of `Metrics` with the given label appended to the end
250    /// of the current `Metrics` label.
251    ///
252    /// This is commonly used to create a nested context for `register`.
253    ///
254    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
255    /// label (reserved for metrics for the runtime).
256    fn with_label(&self, label: &str) -> Self;
257
258    /// Prefix the given label with the current context's label.
259    ///
260    /// Unlike `with_label`, this method does not create a new context.
261    fn scoped_label(&self, label: &str) -> String {
262        let label = if self.label().is_empty() {
263            label.to_string()
264        } else {
265            format!("{}_{}", self.label(), label)
266        };
267        assert!(
268            !label.starts_with(METRICS_PREFIX),
269            "using runtime label is not allowed"
270        );
271        label
272    }
273
274    /// Register a metric with the runtime.
275    ///
276    /// Any registered metric will include (as a prefix) the label of the current context.
277    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
278
279    /// Encode all metrics into a buffer.
280    fn encode(&self) -> String;
281}
282
283/// Interface that any task scheduler must implement to provide
284/// time-based operations.
285///
286/// It is necessary to mock time to provide deterministic execution
287/// of arbitrary tasks.
288pub trait Clock: Clone + Send + Sync + 'static {
289    /// Returns the current time.
290    fn current(&self) -> SystemTime;
291
292    /// Sleep for the given duration.
293    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
294
295    /// Sleep until the given deadline.
296    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
297}
298
299/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
300pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
301
302/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
303pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
304
305/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
306pub type ListenerOf<N> = <N as crate::Network>::Listener;
307
308/// Interface that any runtime must implement to create
309/// network connections.
310pub trait Network: Clone + Send + Sync + 'static {
311    /// The type of [Listener] that's returned when binding to a socket.
312    /// Accepting a connection returns a [Sink] and [Stream] which are defined
313    /// by the [Listener] and used to send and receive data over the connection.
314    type Listener: Listener;
315
316    /// Bind to the given socket address.
317    fn bind(
318        &self,
319        socket: SocketAddr,
320    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
321
322    /// Dial the given socket address.
323    fn dial(
324        &self,
325        socket: SocketAddr,
326    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
327}
328
329/// Interface that any runtime must implement to handle
330/// incoming network connections.
331pub trait Listener: Sync + Send + 'static {
332    /// The type of [Sink] that's returned when accepting a connection.
333    /// This is used to send data to the remote connection.
334    type Sink: Sink;
335    /// The type of [Stream] that's returned when accepting a connection.
336    /// This is used to receive data from the remote connection.
337    type Stream: Stream;
338
339    /// Accept an incoming connection.
340    fn accept(
341        &mut self,
342    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
343
344    /// Returns the local address of the listener.
345    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
346}
347
348/// Interface that any runtime must implement to send
349/// messages over a network connection.
350pub trait Sink: Sync + Send + 'static {
351    /// Send a message to the sink.
352    fn send(
353        &mut self,
354        msg: impl Into<StableBuf> + Send,
355    ) -> impl Future<Output = Result<(), Error>> + Send;
356}
357
358/// Interface that any runtime must implement to receive
359/// messages over a network connection.
360pub trait Stream: Sync + Send + 'static {
361    /// Receive a message from the stream, storing it in the given buffer.
362    /// Reads exactly the number of bytes that fit in the buffer.
363    fn recv(
364        &mut self,
365        buf: impl Into<StableBuf> + Send,
366    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
367}
368
369/// Interface to interact with storage.
370///
371///
372/// To support storage implementations that enable concurrent reads and
373/// writes, blobs are responsible for maintaining synchronization.
374///
375/// Storage can be backed by a local filesystem, cloud storage, etc.
376pub trait Storage: Clone + Send + Sync + 'static {
377    /// The readable/writeable storage buffer that can be opened by this Storage.
378    type Blob: Blob;
379
380    /// Open an existing blob in a given partition or create a new one, returning
381    /// the blob and its length.
382    ///
383    /// Multiple instances of the same blob can be opened concurrently, however,
384    /// writing to the same blob concurrently may lead to undefined behavior.
385    fn open(
386        &self,
387        partition: &str,
388        name: &[u8],
389    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
390
391    /// Remove a blob from a given partition.
392    ///
393    /// If no `name` is provided, the entire partition is removed.
394    fn remove(
395        &self,
396        partition: &str,
397        name: Option<&[u8]>,
398    ) -> impl Future<Output = Result<(), Error>> + Send;
399
400    /// Return all blobs in a given partition.
401    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
402}
403
404/// Interface to read and write to a blob.
405///
406/// To support blob implementations that enable concurrent reads and
407/// writes, blobs are responsible for maintaining synchronization.
408///
409/// Cloning a blob is similar to wrapping a single file descriptor in
410/// a lock whereas opening a new blob (of the same name) is similar to
411/// opening a new file descriptor. If multiple blobs are opened with the same
412/// name, they are not expected to coordinate access to underlying storage
413/// and writing to both is undefined behavior.
414///
415/// When a blob is dropped, any unsynced changes may be discarded. Implementations
416/// may attempt to sync during drop but errors will go unhandled. Call `sync`
417/// before dropping to ensure all changes are durably persisted.
418#[allow(clippy::len_without_is_empty)]
419pub trait Blob: Clone + Send + Sync + 'static {
420    /// Read from the blob at the given offset.
421    ///
422    /// `read_at` does not return the number of bytes read because it
423    /// only returns once the entire buffer has been filled.
424    fn read_at(
425        &self,
426        buf: impl Into<StableBuf> + Send,
427        offset: u64,
428    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
429
430    /// Write `buf` to the blob at the given offset.
431    fn write_at(
432        &self,
433        buf: impl Into<StableBuf> + Send,
434        offset: u64,
435    ) -> impl Future<Output = Result<(), Error>> + Send;
436
437    /// Resize the blob to the given length.
438    ///
439    /// If the length is greater than the current length, the blob is extended with zeros.
440    /// If the length is less than the current length, the blob is resized.
441    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
442
443    /// Ensure all pending data is durably persisted.
444    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use bytes::Bytes;
451    use commonware_macros::select;
452    use futures::{
453        channel::{mpsc, oneshot},
454        future::{pending, ready},
455        join, pin_mut, FutureExt, SinkExt, StreamExt,
456    };
457    use prometheus_client::metrics::counter::Counter;
458    use std::{
459        collections::HashMap,
460        panic::{catch_unwind, AssertUnwindSafe},
461        str::FromStr,
462        sync::{
463            atomic::{AtomicU32, Ordering},
464            Arc, Mutex,
465        },
466    };
467    use tracing::{error, Level};
468    use utils::reschedule;
469
470    fn test_error_future<R: Runner>(runner: R) {
471        async fn error_future() -> Result<&'static str, &'static str> {
472            Err("An error occurred")
473        }
474        let result = runner.start(|_| error_future());
475        assert_eq!(result, Err("An error occurred"));
476    }
477
478    fn test_clock_sleep<R: Runner>(runner: R)
479    where
480        R::Context: Spawner + Clock,
481    {
482        runner.start(|context| async move {
483            // Capture initial time
484            let start = context.current();
485            let sleep_duration = Duration::from_millis(10);
486            context.sleep(sleep_duration).await;
487
488            // After run, time should have advanced
489            let end = context.current();
490            assert!(end.duration_since(start).unwrap() >= sleep_duration);
491        });
492    }
493
494    fn test_clock_sleep_until<R: Runner>(runner: R)
495    where
496        R::Context: Spawner + Clock,
497    {
498        runner.start(|context| async move {
499            // Trigger sleep
500            let now = context.current();
501            context.sleep_until(now + Duration::from_millis(100)).await;
502
503            // Ensure slept duration has elapsed
504            let elapsed = now.elapsed().unwrap();
505            assert!(elapsed >= Duration::from_millis(100));
506        });
507    }
508
509    fn test_root_finishes<R: Runner>(runner: R)
510    where
511        R::Context: Spawner,
512    {
513        runner.start(|context| async move {
514            context.spawn(|_| async move {
515                loop {
516                    reschedule().await;
517                }
518            });
519        });
520    }
521
522    fn test_spawn_abort<R: Runner>(runner: R)
523    where
524        R::Context: Spawner,
525    {
526        runner.start(|context| async move {
527            let handle = context.spawn(|_| async move {
528                loop {
529                    reschedule().await;
530                }
531            });
532            handle.abort();
533            assert!(matches!(handle.await, Err(Error::Closed)));
534        });
535    }
536
537    fn test_panic_aborts_root<R: Runner>(runner: R) {
538        let result = catch_unwind(AssertUnwindSafe(|| {
539            runner.start(|_| async move {
540                panic!("blah");
541            });
542        }));
543        result.unwrap_err();
544    }
545
546    fn test_panic_aborts_spawn<R: Runner>(runner: R)
547    where
548        R::Context: Spawner,
549    {
550        let result = runner.start(|context| async move {
551            let result = context.spawn(|_| async move {
552                panic!("blah");
553            });
554            assert!(matches!(result.await, Err(Error::Exited)));
555            Result::<(), Error>::Ok(())
556        });
557
558        // Ensure panic was caught
559        result.unwrap();
560    }
561
562    fn test_select<R: Runner>(runner: R) {
563        runner.start(|_| async move {
564            // Test first branch
565            let output = Mutex::new(0);
566            select! {
567                v1 = ready(1) => {
568                    *output.lock().unwrap() = v1;
569                },
570                v2 = ready(2) => {
571                    *output.lock().unwrap() = v2;
572                },
573            };
574            assert_eq!(*output.lock().unwrap(), 1);
575
576            // Test second branch
577            select! {
578                v1 = std::future::pending::<i32>() => {
579                    *output.lock().unwrap() = v1;
580                },
581                v2 = ready(2) => {
582                    *output.lock().unwrap() = v2;
583                },
584            };
585            assert_eq!(*output.lock().unwrap(), 2);
586        });
587    }
588
589    /// Ensure future fusing works as expected.
590    fn test_select_loop<R: Runner>(runner: R)
591    where
592        R::Context: Clock,
593    {
594        runner.start(|context| async move {
595            // Should hit timeout
596            let (mut sender, mut receiver) = mpsc::unbounded();
597            for _ in 0..2 {
598                select! {
599                    v = receiver.next() => {
600                        panic!("unexpected value: {v:?}");
601                    },
602                    _ = context.sleep(Duration::from_millis(100)) => {
603                        continue;
604                    },
605                };
606            }
607
608            // Populate channel
609            sender.send(0).await.unwrap();
610            sender.send(1).await.unwrap();
611
612            // Prefer not reading channel without losing messages
613            select! {
614                _ = async {} => {
615                    // Skip reading from channel even though populated
616                },
617                v = receiver.next() => {
618                    panic!("unexpected value: {v:?}");
619                },
620            };
621
622            // Process messages
623            for i in 0..2 {
624                select! {
625                    _ = context.sleep(Duration::from_millis(100)) => {
626                        panic!("timeout");
627                    },
628                    v = receiver.next() => {
629                        assert_eq!(v.unwrap(), i);
630                    },
631                };
632            }
633        });
634    }
635
636    fn test_storage_operations<R: Runner>(runner: R)
637    where
638        R::Context: Storage,
639    {
640        runner.start(|context| async move {
641            let partition = "test_partition";
642            let name = b"test_blob";
643
644            // Open a new blob
645            let (blob, _) = context
646                .open(partition, name)
647                .await
648                .expect("Failed to open blob");
649
650            // Write data to the blob
651            let data = b"Hello, Storage!";
652            blob.write_at(Vec::from(data), 0)
653                .await
654                .expect("Failed to write to blob");
655
656            // Sync the blob
657            blob.sync().await.expect("Failed to sync blob");
658
659            // Read data from the blob
660            let read = blob
661                .read_at(vec![0; data.len()], 0)
662                .await
663                .expect("Failed to read from blob");
664            assert_eq!(read.as_ref(), data);
665
666            // Sync the blob
667            blob.sync().await.expect("Failed to sync blob");
668
669            // Scan blobs in the partition
670            let blobs = context
671                .scan(partition)
672                .await
673                .expect("Failed to scan partition");
674            assert!(blobs.contains(&name.to_vec()));
675
676            // Reopen the blob
677            let (blob, len) = context
678                .open(partition, name)
679                .await
680                .expect("Failed to reopen blob");
681            assert_eq!(len, data.len() as u64);
682
683            // Read data part of message back
684            let read = blob
685                .read_at(vec![0u8; 7], 7)
686                .await
687                .expect("Failed to read data");
688            assert_eq!(read.as_ref(), b"Storage");
689
690            // Sync the blob
691            blob.sync().await.expect("Failed to sync blob");
692
693            // Remove the blob
694            context
695                .remove(partition, Some(name))
696                .await
697                .expect("Failed to remove blob");
698
699            // Ensure the blob is removed
700            let blobs = context
701                .scan(partition)
702                .await
703                .expect("Failed to scan partition");
704            assert!(!blobs.contains(&name.to_vec()));
705
706            // Remove the partition
707            context
708                .remove(partition, None)
709                .await
710                .expect("Failed to remove partition");
711
712            // Scan the partition
713            let result = context.scan(partition).await;
714            assert!(matches!(result, Err(Error::PartitionMissing(_))));
715        });
716    }
717
718    fn test_blob_read_write<R: Runner>(runner: R)
719    where
720        R::Context: Storage,
721    {
722        runner.start(|context| async move {
723            let partition = "test_partition";
724            let name = b"test_blob_rw";
725
726            // Open a new blob
727            let (blob, _) = context
728                .open(partition, name)
729                .await
730                .expect("Failed to open blob");
731
732            // Write data at different offsets
733            let data1 = b"Hello";
734            let data2 = b"World";
735            blob.write_at(Vec::from(data1), 0)
736                .await
737                .expect("Failed to write data1");
738            blob.write_at(Vec::from(data2), 5)
739                .await
740                .expect("Failed to write data2");
741
742            // Read data back
743            let read = blob
744                .read_at(vec![0u8; 10], 0)
745                .await
746                .expect("Failed to read data");
747            assert_eq!(&read.as_ref()[..5], data1);
748            assert_eq!(&read.as_ref()[5..], data2);
749
750            // Read past end of blob
751            let result = blob.read_at(vec![0u8; 10], 10).await;
752            assert!(result.is_err());
753
754            // Rewrite data without affecting length
755            let data3 = b"Store";
756            blob.write_at(Vec::from(data3), 5)
757                .await
758                .expect("Failed to write data3");
759
760            // Read data back
761            let read = blob
762                .read_at(vec![0u8; 10], 0)
763                .await
764                .expect("Failed to read data");
765            assert_eq!(&read.as_ref()[..5], data1);
766            assert_eq!(&read.as_ref()[5..], data3);
767
768            // Read past end of blob
769            let result = blob.read_at(vec![0u8; 10], 10).await;
770            assert!(result.is_err());
771        });
772    }
773
774    fn test_blob_resize<R: Runner>(runner: R)
775    where
776        R::Context: Storage,
777    {
778        runner.start(|context| async move {
779            let partition = "test_partition_resize";
780            let name = b"test_blob_resize";
781
782            // Open and write to a new blob
783            let (blob, _) = context
784                .open(partition, name)
785                .await
786                .expect("Failed to open blob");
787
788            let data = b"some data";
789            blob.write_at(data.to_vec(), 0)
790                .await
791                .expect("Failed to write");
792            blob.sync().await.expect("Failed to sync after write");
793
794            // Re-open and check length
795            let (blob, len) = context.open(partition, name).await.unwrap();
796            assert_eq!(len, data.len() as u64);
797
798            // Resize to extend the file
799            let new_len = (data.len() as u64) * 2;
800            blob.resize(new_len)
801                .await
802                .expect("Failed to resize to extend");
803            blob.sync().await.expect("Failed to sync after resize");
804
805            // Re-open and check length again
806            let (blob, len) = context.open(partition, name).await.unwrap();
807            assert_eq!(len, new_len);
808
809            // Read original data
810            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
811            assert_eq!(read_buf.as_ref(), data);
812
813            // Read extended part (should be zeros)
814            let extended_part = blob
815                .read_at(vec![0; data.len()], data.len() as u64)
816                .await
817                .unwrap();
818            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
819
820            // Truncate the blob
821            blob.resize(data.len() as u64).await.unwrap();
822            blob.sync().await.unwrap();
823
824            // Reopen to check truncation
825            let (blob, size) = context.open(partition, name).await.unwrap();
826            assert_eq!(size, data.len() as u64);
827
828            // Read truncated data
829            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
830            assert_eq!(read_buf.as_ref(), data);
831            blob.sync().await.unwrap();
832        });
833    }
834
835    fn test_many_partition_read_write<R: Runner>(runner: R)
836    where
837        R::Context: Storage,
838    {
839        runner.start(|context| async move {
840            let partitions = ["partition1", "partition2", "partition3"];
841            let name = b"test_blob_rw";
842            let data1 = b"Hello";
843            let data2 = b"World";
844
845            for (additional, partition) in partitions.iter().enumerate() {
846                // Open a new blob
847                let (blob, _) = context
848                    .open(partition, name)
849                    .await
850                    .expect("Failed to open blob");
851
852                // Write data at different offsets
853                blob.write_at(Vec::from(data1), 0)
854                    .await
855                    .expect("Failed to write data1");
856                blob.write_at(Vec::from(data2), 5 + additional as u64)
857                    .await
858                    .expect("Failed to write data2");
859
860                // Sync the blob
861                blob.sync().await.expect("Failed to sync blob");
862            }
863
864            for (additional, partition) in partitions.iter().enumerate() {
865                // Open a new blob
866                let (blob, len) = context
867                    .open(partition, name)
868                    .await
869                    .expect("Failed to open blob");
870                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
871
872                // Read data back
873                let read = blob
874                    .read_at(vec![0u8; 10 + additional], 0)
875                    .await
876                    .expect("Failed to read data");
877                assert_eq!(&read.as_ref()[..5], b"Hello");
878                assert_eq!(&read.as_ref()[5 + additional..], b"World");
879            }
880        });
881    }
882
883    fn test_blob_read_past_length<R: Runner>(runner: R)
884    where
885        R::Context: Storage,
886    {
887        runner.start(|context| async move {
888            let partition = "test_partition";
889            let name = b"test_blob_rw";
890
891            // Open a new blob
892            let (blob, _) = context
893                .open(partition, name)
894                .await
895                .expect("Failed to open blob");
896
897            // Read data past file length (empty file)
898            let result = blob.read_at(vec![0u8; 10], 0).await;
899            assert!(result.is_err());
900
901            // Write data to the blob
902            let data = b"Hello, Storage!".to_vec();
903            blob.write_at(data, 0)
904                .await
905                .expect("Failed to write to blob");
906
907            // Read data past file length (non-empty file)
908            let result = blob.read_at(vec![0u8; 20], 0).await;
909            assert!(result.is_err());
910        })
911    }
912
913    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
914    where
915        R::Context: Spawner + Storage + Metrics,
916    {
917        runner.start(|context| async move {
918            let partition = "test_partition";
919            let name = b"test_blob_rw";
920
921            // Open a new blob
922            let (blob, _) = context
923                .open(partition, name)
924                .await
925                .expect("Failed to open blob");
926
927            // Write data to the blob
928            let data = b"Hello, Storage!";
929            blob.write_at(Vec::from(data), 0)
930                .await
931                .expect("Failed to write to blob");
932
933            // Sync the blob
934            blob.sync().await.expect("Failed to sync blob");
935
936            // Read data from the blob in clone
937            let check1 = context.with_label("check1").spawn({
938                let blob = blob.clone();
939                move |_| async move {
940                    let read = blob
941                        .read_at(vec![0u8; data.len()], 0)
942                        .await
943                        .expect("Failed to read from blob");
944                    assert_eq!(read.as_ref(), data);
945                }
946            });
947            let check2 = context.with_label("check2").spawn({
948                let blob = blob.clone();
949                move |_| async move {
950                    let read = blob
951                        .read_at(vec![0; data.len()], 0)
952                        .await
953                        .expect("Failed to read from blob");
954                    assert_eq!(read.as_ref(), data);
955                }
956            });
957
958            // Wait for both reads to complete
959            let result = join!(check1, check2);
960            assert!(result.0.is_ok());
961            assert!(result.1.is_ok());
962
963            // Read data from the blob
964            let read = blob
965                .read_at(vec![0; data.len()], 0)
966                .await
967                .expect("Failed to read from blob");
968            assert_eq!(read.as_ref(), data);
969
970            // Drop the blob
971            drop(blob);
972
973            // Ensure no blobs still open
974            let buffer = context.encode();
975            assert!(buffer.contains("open_blobs 0"));
976        });
977    }
978
979    fn test_shutdown<R: Runner>(runner: R)
980    where
981        R::Context: Spawner + Metrics + Clock,
982    {
983        let kill = 9;
984        runner.start(|context| async move {
985            // Spawn a task that waits for signal
986            let before = context
987                .with_label("before")
988                .spawn(move |context| async move {
989                    let mut signal = context.stopped();
990                    let value = (&mut signal).await.unwrap();
991                    assert_eq!(value, kill);
992                    drop(signal);
993                });
994
995            // Signal the tasks and wait for them to stop
996            let result = context.clone().stop(kill, None).await;
997            assert!(result.is_ok());
998
999            // Spawn a task after stop is called
1000            let after = context
1001                .with_label("after")
1002                .spawn(move |context| async move {
1003                    // A call to `stopped()` after `stop()` resolves immediately
1004                    let value = context.stopped().await.unwrap();
1005                    assert_eq!(value, kill);
1006                });
1007
1008            // Ensure both tasks complete
1009            let result = join!(before, after);
1010            assert!(result.0.is_ok());
1011            assert!(result.1.is_ok());
1012        });
1013    }
1014
1015    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1016    where
1017        R::Context: Spawner + Metrics + Clock,
1018    {
1019        let kill = 42;
1020        runner.start(|context| async move {
1021            let (started_tx, mut started_rx) = mpsc::channel(3);
1022            let counter = Arc::new(AtomicU32::new(0));
1023
1024            // Spawn 3 tasks that do cleanup work after receiving stop signal
1025            // and increment a shared counter
1026            let task = |cleanup_duration: Duration| {
1027                let context = context.clone();
1028                let counter = counter.clone();
1029                let mut started_tx = started_tx.clone();
1030                context.spawn(move |context| async move {
1031                    // Wait for signal to be acquired
1032                    let mut signal = context.stopped();
1033                    started_tx.send(()).await.unwrap();
1034
1035                    // Increment once killed
1036                    let value = (&mut signal).await.unwrap();
1037                    assert_eq!(value, kill);
1038                    context.sleep(cleanup_duration).await;
1039                    counter.fetch_add(1, Ordering::SeqCst);
1040
1041                    // Wait to drop signal until work has been done
1042                    drop(signal);
1043                })
1044            };
1045
1046            let task1 = task(Duration::from_millis(10));
1047            let task2 = task(Duration::from_millis(20));
1048            let task3 = task(Duration::from_millis(30));
1049
1050            // Give tasks time to start
1051            for _ in 0..3 {
1052                started_rx.next().await.unwrap();
1053            }
1054
1055            // Stop and verify all cleanup completed
1056            context.stop(kill, None).await.unwrap();
1057            assert_eq!(counter.load(Ordering::SeqCst), 3);
1058
1059            // Ensure all tasks completed
1060            let result = join!(task1, task2, task3);
1061            assert!(result.0.is_ok());
1062            assert!(result.1.is_ok());
1063            assert!(result.2.is_ok());
1064        });
1065    }
1066
1067    fn test_shutdown_timeout<R: Runner>(runner: R)
1068    where
1069        R::Context: Spawner + Metrics + Clock,
1070    {
1071        let kill = 42;
1072        runner.start(|context| async move {
1073            // Setup startup coordinator
1074            let (started_tx, started_rx) = oneshot::channel();
1075
1076            // Spawn a task that never completes its cleanup
1077            context.clone().spawn(move |context| async move {
1078                let signal = context.stopped();
1079                started_tx.send(()).unwrap();
1080                pending::<()>().await;
1081                signal.await.unwrap();
1082            });
1083
1084            // Try to stop with a timeout
1085            started_rx.await.unwrap();
1086            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1087
1088            // Assert that we got a timeout error
1089            assert!(matches!(result, Err(Error::Timeout)));
1090        });
1091    }
1092
1093    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1094    where
1095        R::Context: Spawner + Metrics + Clock,
1096    {
1097        let kill1 = 42;
1098        let kill2 = 43;
1099
1100        runner.start(|context| async move {
1101            let (started_tx, started_rx) = oneshot::channel();
1102            let counter = Arc::new(AtomicU32::new(0));
1103
1104            // Spawn a task that delays completion to test timing
1105            let task = context.with_label("blocking_task").spawn({
1106                let counter = counter.clone();
1107                move |context| async move {
1108                    // Wait for signal to be acquired
1109                    let mut signal = context.stopped();
1110                    started_tx.send(()).unwrap();
1111
1112                    // Wait for signal to be resolved
1113                    let value = (&mut signal).await.unwrap();
1114                    assert_eq!(value, kill1);
1115                    context.sleep(Duration::from_millis(50)).await;
1116
1117                    // Increment counter
1118                    counter.fetch_add(1, Ordering::SeqCst);
1119                    drop(signal);
1120                }
1121            });
1122
1123            // Give task time to start
1124            started_rx.await.unwrap();
1125
1126            // Issue two separate stop calls
1127            // The second stop call uses a different stop value that should be ignored
1128            let stop_task1 = context.clone().stop(kill1, None);
1129            pin_mut!(stop_task1);
1130            let stop_task2 = context.clone().stop(kill2, None);
1131            pin_mut!(stop_task2);
1132
1133            // Both of them should be awaiting completion
1134            assert!(stop_task1.as_mut().now_or_never().is_none());
1135            assert!(stop_task2.as_mut().now_or_never().is_none());
1136
1137            // Wait for both stop calls to complete
1138            assert!(stop_task1.await.is_ok());
1139            assert!(stop_task2.await.is_ok());
1140
1141            // Verify first stop value wins
1142            let sig = context.stopped().await;
1143            assert_eq!(sig.unwrap(), kill1);
1144
1145            // Wait for blocking task to complete
1146            let result = task.await;
1147            assert!(result.is_ok());
1148            assert_eq!(counter.load(Ordering::SeqCst), 1);
1149
1150            // Post-completion stop should return immediately
1151            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1152        });
1153    }
1154
1155    fn test_spawn_ref<R: Runner>(runner: R)
1156    where
1157        R::Context: Spawner,
1158    {
1159        runner.start(|mut context| async move {
1160            let handle = context.spawn_ref();
1161            let result = handle(async move { 42 }).await;
1162            assert!(matches!(result, Ok(42)));
1163        });
1164    }
1165
1166    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1167    where
1168        R::Context: Spawner,
1169    {
1170        runner.start(|mut context| async move {
1171            let handle = context.spawn_ref();
1172            let result = handle(async move { 42 }).await;
1173            assert!(matches!(result, Ok(42)));
1174
1175            // Ensure context is consumed
1176            let handle = context.spawn_ref();
1177            let result = handle(async move { 42 }).await;
1178            assert!(matches!(result, Ok(42)));
1179        });
1180    }
1181
1182    fn test_spawn_duplicate<R: Runner>(runner: R)
1183    where
1184        R::Context: Spawner,
1185    {
1186        runner.start(|mut context| async move {
1187            let handle = context.spawn_ref();
1188            let result = handle(async move { 42 }).await;
1189            assert!(matches!(result, Ok(42)));
1190
1191            // Ensure context is consumed
1192            context.spawn(|_| async move { 42 });
1193        });
1194    }
1195
1196    fn test_spawn_child<R: Runner>(runner: R)
1197    where
1198        R::Context: Spawner + Clock,
1199    {
1200        runner.start(|context| async move {
1201            let child_handle = Arc::new(Mutex::new(None));
1202            let child_handle2 = child_handle.clone();
1203
1204            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1205            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1206            let parent_handle = context.spawn(move |context| async move {
1207                // Spawn child that completes immediately
1208                let handle = context.spawn_child(|_| async {});
1209
1210                // Store child handle so we can test it later
1211                *child_handle2.lock().unwrap() = Some(handle);
1212
1213                parent_initialized_tx.send(()).unwrap();
1214
1215                // Parent task completes
1216                parent_complete_rx.await.unwrap();
1217            });
1218
1219            // Wait for parent task to spawn the children
1220            parent_initialized_rx.await.unwrap();
1221
1222            // Child task completes successfully
1223            let child_handle = child_handle.lock().unwrap().take().unwrap();
1224            assert!(child_handle.await.is_ok());
1225
1226            // Complete the parent task
1227            parent_complete_tx.send(()).unwrap();
1228
1229            // Wait for parent task to complete successfully
1230            assert!(parent_handle.await.is_ok());
1231        });
1232    }
1233
1234    fn test_spawn_child_abort_on_parent_abort<R: Runner>(runner: R)
1235    where
1236        R::Context: Spawner + Clock,
1237    {
1238        runner.start(|context| async move {
1239            let child_handle = Arc::new(Mutex::new(None));
1240            let child_handle2 = child_handle.clone();
1241
1242            let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1243            let parent_handle = context.spawn(move |context| async move {
1244                // Spawn child task that hangs forever, should be aborted when parent aborts
1245                let handle = context.spawn_child(|_| pending::<()>());
1246
1247                // Store child task handle so we can test it later
1248                *child_handle2.lock().unwrap() = Some(handle);
1249
1250                parent_initialized_tx.send(()).unwrap();
1251
1252                // Parent task runs until aborted
1253                pending::<()>().await
1254            });
1255
1256            // Wait for parent task to spawn the children
1257            parent_initialized_rx.await.unwrap();
1258
1259            // Abort parent task
1260            parent_handle.abort();
1261            assert!(parent_handle.await.is_err());
1262
1263            // Child task should also resolve with error since its parent aborted
1264            let child_handle = child_handle.lock().unwrap().take().unwrap();
1265            assert!(child_handle.await.is_err());
1266        });
1267    }
1268
1269    fn test_spawn_child_abort_on_parent_completion<R: Runner>(runner: R)
1270    where
1271        R::Context: Spawner + Clock,
1272    {
1273        runner.start(|context| async move {
1274            let child_handle = Arc::new(Mutex::new(None));
1275            let child_handle2 = child_handle.clone();
1276
1277            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1278            let parent_handle = context.spawn(move |context| async move {
1279                // Spawn child task that hangs forever, should be aborted when parent completes
1280                let handle = context.spawn_child(|_| pending::<()>());
1281
1282                // Store child task handle so we can test it later
1283                *child_handle2.lock().unwrap() = Some(handle);
1284
1285                // Parent task completes
1286                parent_complete_rx.await.unwrap();
1287            });
1288
1289            // Fire parent completion
1290            parent_complete_tx.send(()).unwrap();
1291
1292            // Wait for parent task to complete
1293            assert!(parent_handle.await.is_ok());
1294
1295            // Child task should also resolve with error since its parent has completed
1296            let child_handle = child_handle.lock().unwrap().take().unwrap();
1297            assert!(child_handle.await.is_err());
1298        });
1299    }
1300
1301    fn test_spawn_child_cascading_abort<R: Runner>(runner: R)
1302    where
1303        R::Context: Spawner + Clock,
1304    {
1305        runner.start(|context| async move {
1306            // We create the following tree of tasks. All tasks will run
1307            // indefinitely (until aborted).
1308            //
1309            //          root
1310            //     /     |     \
1311            //    /      |      \
1312            //   c0      c1      c2
1313            //  /  \    /  \    /  \
1314            // g0  g1  g2  g3  g4  g5
1315
1316            let handles = Arc::new(Mutex::new(Vec::new()));
1317            let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1318            let root_task = {
1319                let handles = handles.clone();
1320                context.spawn(move |context| async move {
1321                    for _ in 0..3 {
1322                        let handles2 = handles.clone();
1323                        let mut initialized_tx2 = initialized_tx.clone();
1324                        let handle = context.clone().spawn_child(move |context| async move {
1325                            for _ in 0..2 {
1326                                let handle = context.clone().spawn_child(|_| async {
1327                                    pending::<()>().await;
1328                                });
1329                                handles2.lock().unwrap().push(handle);
1330                                initialized_tx2.send(()).await.unwrap();
1331                            }
1332                            pending::<()>().await;
1333                        });
1334
1335                        handles.lock().unwrap().push(handle);
1336                        initialized_tx.send(()).await.unwrap();
1337                    }
1338
1339                    pending::<()>().await;
1340                })
1341            };
1342
1343            // Wait for tasks to initialize
1344            for _ in 0..9 {
1345                initialized_rx.next().await.unwrap();
1346            }
1347
1348            // Verify we have all 9 handles (3 children + 6 grandchildren)
1349            assert_eq!(handles.lock().unwrap().len(), 9,);
1350
1351            // Abort root task
1352            root_task.abort();
1353            assert!(root_task.await.is_err());
1354
1355            // All handles should resolve with error due to cascading abort
1356            let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1357            for handle in handles {
1358                assert!(handle.await.is_err());
1359            }
1360        });
1361    }
1362
1363    fn test_child_survives_sibling_completion<R: Runner>(runner: R, use_spawn_ref: bool)
1364    where
1365        R::Context: Spawner + Clock,
1366    {
1367        runner.start(|context| async move {
1368            let (child_started_tx, child_started_rx) = oneshot::channel();
1369            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1370            let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1371            let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1372            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1373
1374            let parent = context.spawn(move |mut context| async move {
1375                // Spawn a child task
1376                context.clone().spawn_child(|_| async move {
1377                    child_started_tx.send(()).unwrap();
1378                    // Wait for signal to complete
1379                    child_complete_rx.await.unwrap();
1380                });
1381
1382                // Spawn an independent sibling task using spawn or spawn_ref based on parameter
1383                let sibling_task = async move {
1384                    sibling_started_tx.send(()).unwrap();
1385                    // Wait for signal to complete
1386                    sibling_complete_rx.await.unwrap();
1387                };
1388
1389                if use_spawn_ref {
1390                    context.spawn_ref()(sibling_task);
1391                } else {
1392                    context.spawn(|_| sibling_task);
1393                }
1394
1395                // Wait for signal to complete
1396                parent_complete_rx.await.unwrap();
1397            });
1398
1399            // Wait for both to start
1400            child_started_rx.await.unwrap();
1401            sibling_started_rx.await.unwrap();
1402
1403            // Kill the sibling
1404            sibling_complete_tx.send(()).unwrap();
1405
1406            // The child task should still be alive
1407            child_complete_tx.send(()).unwrap();
1408
1409            // As well as the parent
1410            parent_complete_tx.send(()).unwrap();
1411
1412            assert!(parent.await.is_ok());
1413        });
1414    }
1415
1416    fn test_clone_context_no_child_inheritance<R: Runner>(runner: R)
1417    where
1418        R::Context: Spawner + Clock,
1419    {
1420        runner.start(|context| async move {
1421            let (child_started_tx, child_started_rx) = oneshot::channel();
1422            let (child_complete_tx, child_complete_rx) = oneshot::channel();
1423            let (cloned_task_started_tx, cloned_task_started_rx) = oneshot::channel();
1424            let (cloned_task_complete_tx, cloned_task_complete_rx) = oneshot::channel();
1425            let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1426
1427            // Parent task that spawns a child using a cloned context
1428            let cloned_context = context.clone();
1429            let parent = cloned_context.spawn(move |context| async move {
1430                // Spawn a child task
1431                context.spawn_child(|_| async move {
1432                    child_started_tx.send(()).unwrap();
1433                    child_complete_rx.await.unwrap();
1434                });
1435
1436                // Wait for parent to complete
1437                parent_complete_rx.await.unwrap();
1438            });
1439
1440            // Use the original context that was previously cloned and spawn a
1441            // task. This task should NOT inherit the child relationship
1442            context.spawn(move |_| async move {
1443                cloned_task_started_tx.send(()).unwrap();
1444                cloned_task_complete_rx.await.unwrap();
1445            });
1446
1447            // Wait for both tasks to start
1448            child_started_rx.await.unwrap();
1449            cloned_task_started_rx.await.unwrap();
1450
1451            // Complete the cloned task, this should NOT affect the child in the
1452            // other context
1453            cloned_task_complete_tx.send(()).unwrap();
1454
1455            // The child should still be alive
1456            child_complete_tx.send(()).unwrap();
1457
1458            // As well as the parent
1459            parent_complete_tx.send(()).unwrap();
1460
1461            assert!(parent.await.is_ok());
1462        });
1463    }
1464
1465    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1466    where
1467        R::Context: Spawner,
1468    {
1469        runner.start(|context| async move {
1470            let handle = context.spawn_blocking(dedicated, |_| 42);
1471            let result = handle.await;
1472            assert!(matches!(result, Ok(42)));
1473        });
1474    }
1475
1476    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1477    where
1478        R::Context: Spawner,
1479    {
1480        runner.start(|mut context| async move {
1481            let spawn = context.spawn_blocking_ref(dedicated);
1482            let handle = spawn(|| 42);
1483            let result = handle.await;
1484            assert!(matches!(result, Ok(42)));
1485        });
1486    }
1487
1488    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1489    where
1490        R::Context: Spawner,
1491    {
1492        runner.start(|mut context| async move {
1493            let spawn = context.spawn_blocking_ref(dedicated);
1494            let result = spawn(|| 42).await;
1495            assert!(matches!(result, Ok(42)));
1496
1497            // Ensure context is consumed
1498            context.spawn_blocking(dedicated, |_| 42);
1499        });
1500    }
1501
1502    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1503    where
1504        R::Context: Spawner,
1505    {
1506        runner.start(|context| async move {
1507            // Create task
1508            let (sender, mut receiver) = oneshot::channel();
1509            let handle = context.spawn_blocking(dedicated, move |_| {
1510                // Wait for abort to be called
1511                loop {
1512                    if receiver.try_recv().is_ok() {
1513                        break;
1514                    }
1515                }
1516
1517                // Perform a long-running operation
1518                let mut count = 0;
1519                loop {
1520                    count += 1;
1521                    if count >= 100_000_000 {
1522                        break;
1523                    }
1524                }
1525                count
1526            });
1527
1528            // Abort the task
1529            //
1530            // If there was an `.await` prior to sending a message over the oneshot, this test
1531            // could deadlock (depending on the runtime implementation) because the blocking task
1532            // would never yield (preventing send from being called).
1533            handle.abort();
1534            sender.send(()).unwrap();
1535
1536            // Wait for the task to complete
1537            assert!(matches!(handle.await, Ok(100_000_000)));
1538        });
1539    }
1540
1541    fn test_metrics<R: Runner>(runner: R)
1542    where
1543        R::Context: Metrics,
1544    {
1545        runner.start(|context| async move {
1546            // Assert label
1547            assert_eq!(context.label(), "");
1548
1549            // Register a metric
1550            let counter = Counter::<u64>::default();
1551            context.register("test", "test", counter.clone());
1552
1553            // Increment the counter
1554            counter.inc();
1555
1556            // Encode metrics
1557            let buffer = context.encode();
1558            assert!(buffer.contains("test_total 1"));
1559
1560            // Nested context
1561            let context = context.with_label("nested");
1562            let nested_counter = Counter::<u64>::default();
1563            context.register("test", "test", nested_counter.clone());
1564
1565            // Increment the counter
1566            nested_counter.inc();
1567
1568            // Encode metrics
1569            let buffer = context.encode();
1570            assert!(buffer.contains("nested_test_total 1"));
1571            assert!(buffer.contains("test_total 1"));
1572        });
1573    }
1574
1575    fn test_metrics_label<R: Runner>(runner: R)
1576    where
1577        R::Context: Metrics,
1578    {
1579        runner.start(|context| async move {
1580            context.with_label(METRICS_PREFIX);
1581        })
1582    }
1583
1584    #[test]
1585    fn test_deterministic_future() {
1586        let runner = deterministic::Runner::default();
1587        test_error_future(runner);
1588    }
1589
1590    #[test]
1591    fn test_deterministic_clock_sleep() {
1592        let executor = deterministic::Runner::default();
1593        test_clock_sleep(executor);
1594    }
1595
1596    #[test]
1597    fn test_deterministic_clock_sleep_until() {
1598        let executor = deterministic::Runner::default();
1599        test_clock_sleep_until(executor);
1600    }
1601
1602    #[test]
1603    fn test_deterministic_root_finishes() {
1604        let executor = deterministic::Runner::default();
1605        test_root_finishes(executor);
1606    }
1607
1608    #[test]
1609    fn test_deterministic_spawn_abort() {
1610        let executor = deterministic::Runner::default();
1611        test_spawn_abort(executor);
1612    }
1613
1614    #[test]
1615    fn test_deterministic_panic_aborts_root() {
1616        let runner = deterministic::Runner::default();
1617        test_panic_aborts_root(runner);
1618    }
1619
1620    #[test]
1621    #[should_panic(expected = "blah")]
1622    fn test_deterministic_panic_aborts_spawn() {
1623        let executor = deterministic::Runner::default();
1624        test_panic_aborts_spawn(executor);
1625    }
1626
1627    #[test]
1628    fn test_deterministic_select() {
1629        let executor = deterministic::Runner::default();
1630        test_select(executor);
1631    }
1632
1633    #[test]
1634    fn test_deterministic_select_loop() {
1635        let executor = deterministic::Runner::default();
1636        test_select_loop(executor);
1637    }
1638
1639    #[test]
1640    fn test_deterministic_storage_operations() {
1641        let executor = deterministic::Runner::default();
1642        test_storage_operations(executor);
1643    }
1644
1645    #[test]
1646    fn test_deterministic_blob_read_write() {
1647        let executor = deterministic::Runner::default();
1648        test_blob_read_write(executor);
1649    }
1650
1651    #[test]
1652    fn test_deterministic_blob_resize() {
1653        let executor = deterministic::Runner::default();
1654        test_blob_resize(executor);
1655    }
1656
1657    #[test]
1658    fn test_deterministic_many_partition_read_write() {
1659        let executor = deterministic::Runner::default();
1660        test_many_partition_read_write(executor);
1661    }
1662
1663    #[test]
1664    fn test_deterministic_blob_read_past_length() {
1665        let executor = deterministic::Runner::default();
1666        test_blob_read_past_length(executor);
1667    }
1668
1669    #[test]
1670    fn test_deterministic_blob_clone_and_concurrent_read() {
1671        // Run test
1672        let executor = deterministic::Runner::default();
1673        test_blob_clone_and_concurrent_read(executor);
1674    }
1675
1676    #[test]
1677    fn test_deterministic_shutdown() {
1678        let executor = deterministic::Runner::default();
1679        test_shutdown(executor);
1680    }
1681
1682    #[test]
1683    fn test_deterministic_shutdown_multiple_signals() {
1684        let executor = deterministic::Runner::default();
1685        test_shutdown_multiple_signals(executor);
1686    }
1687
1688    #[test]
1689    fn test_deterministic_shutdown_timeout() {
1690        let executor = deterministic::Runner::default();
1691        test_shutdown_timeout(executor);
1692    }
1693
1694    #[test]
1695    fn test_deterministic_shutdown_multiple_stop_calls() {
1696        let executor = deterministic::Runner::default();
1697        test_shutdown_multiple_stop_calls(executor);
1698    }
1699
1700    #[test]
1701    fn test_deterministic_spawn_ref() {
1702        let executor = deterministic::Runner::default();
1703        test_spawn_ref(executor);
1704    }
1705
1706    #[test]
1707    #[should_panic]
1708    fn test_deterministic_spawn_ref_duplicate() {
1709        let executor = deterministic::Runner::default();
1710        test_spawn_ref_duplicate(executor);
1711    }
1712
1713    #[test]
1714    #[should_panic]
1715    fn test_deterministic_spawn_duplicate() {
1716        let executor = deterministic::Runner::default();
1717        test_spawn_duplicate(executor);
1718    }
1719
1720    #[test]
1721    fn test_deterministic_spawn_child() {
1722        let runner = deterministic::Runner::default();
1723        test_spawn_child(runner);
1724    }
1725
1726    #[test]
1727    fn test_deterministic_spawn_child_abort_on_parent_abort() {
1728        let runner = deterministic::Runner::default();
1729        test_spawn_child_abort_on_parent_abort(runner);
1730    }
1731
1732    #[test]
1733    fn test_deterministic_spawn_child_abort_on_parent_completion() {
1734        let runner = deterministic::Runner::default();
1735        test_spawn_child_abort_on_parent_completion(runner);
1736    }
1737
1738    #[test]
1739    fn test_deterministic_spawn_child_cascading_abort() {
1740        let runner = deterministic::Runner::default();
1741        test_spawn_child_cascading_abort(runner);
1742    }
1743
1744    #[test]
1745    fn test_deterministic_child_survives_sibling_completion() {
1746        for use_spawn_ref in [false, true] {
1747            let runner = deterministic::Runner::default();
1748            test_child_survives_sibling_completion(runner, use_spawn_ref);
1749        }
1750    }
1751
1752    #[test]
1753    fn test_deterministic_clone_context_no_child_inheritance() {
1754        let runner = deterministic::Runner::default();
1755        test_clone_context_no_child_inheritance(runner);
1756    }
1757
1758    #[test]
1759    fn test_deterministic_spawn_blocking() {
1760        for dedicated in [false, true] {
1761            let executor = deterministic::Runner::default();
1762            test_spawn_blocking(executor, dedicated);
1763        }
1764    }
1765
1766    #[test]
1767    #[should_panic(expected = "blocking task panicked")]
1768    fn test_deterministic_spawn_blocking_panic() {
1769        for dedicated in [false, true] {
1770            let executor = deterministic::Runner::default();
1771            executor.start(|context| async move {
1772                let handle = context.spawn_blocking(dedicated, |_| {
1773                    panic!("blocking task panicked");
1774                });
1775                handle.await.unwrap();
1776            });
1777        }
1778    }
1779
1780    #[test]
1781    fn test_deterministic_spawn_blocking_abort() {
1782        for dedicated in [false, true] {
1783            let executor = deterministic::Runner::default();
1784            test_spawn_blocking_abort(executor, dedicated);
1785        }
1786    }
1787
1788    #[test]
1789    fn test_deterministic_spawn_blocking_ref() {
1790        for dedicated in [false, true] {
1791            let executor = deterministic::Runner::default();
1792            test_spawn_blocking_ref(executor, dedicated);
1793        }
1794    }
1795
1796    #[test]
1797    #[should_panic]
1798    fn test_deterministic_spawn_blocking_ref_duplicate() {
1799        for dedicated in [false, true] {
1800            let executor = deterministic::Runner::default();
1801            test_spawn_blocking_ref_duplicate(executor, dedicated);
1802        }
1803    }
1804
1805    #[test]
1806    fn test_deterministic_metrics() {
1807        let executor = deterministic::Runner::default();
1808        test_metrics(executor);
1809    }
1810
1811    #[test]
1812    #[should_panic]
1813    fn test_deterministic_metrics_label() {
1814        let executor = deterministic::Runner::default();
1815        test_metrics_label(executor);
1816    }
1817
1818    #[test]
1819    fn test_tokio_error_future() {
1820        let runner = tokio::Runner::default();
1821        test_error_future(runner);
1822    }
1823
1824    #[test]
1825    fn test_tokio_clock_sleep() {
1826        let executor = tokio::Runner::default();
1827        test_clock_sleep(executor);
1828    }
1829
1830    #[test]
1831    fn test_tokio_clock_sleep_until() {
1832        let executor = tokio::Runner::default();
1833        test_clock_sleep_until(executor);
1834    }
1835
1836    #[test]
1837    fn test_tokio_root_finishes() {
1838        let executor = tokio::Runner::default();
1839        test_root_finishes(executor);
1840    }
1841
1842    #[test]
1843    fn test_tokio_spawn_abort() {
1844        let executor = tokio::Runner::default();
1845        test_spawn_abort(executor);
1846    }
1847
1848    #[test]
1849    fn test_tokio_panic_aborts_root() {
1850        let executor = tokio::Runner::default();
1851        test_panic_aborts_root(executor);
1852    }
1853
1854    #[test]
1855    fn test_tokio_panic_aborts_spawn() {
1856        let executor = tokio::Runner::default();
1857        test_panic_aborts_spawn(executor);
1858    }
1859
1860    #[test]
1861    fn test_tokio_select() {
1862        let executor = tokio::Runner::default();
1863        test_select(executor);
1864    }
1865
1866    #[test]
1867    fn test_tokio_select_loop() {
1868        let executor = tokio::Runner::default();
1869        test_select_loop(executor);
1870    }
1871
1872    #[test]
1873    fn test_tokio_storage_operations() {
1874        let executor = tokio::Runner::default();
1875        test_storage_operations(executor);
1876    }
1877
1878    #[test]
1879    fn test_tokio_blob_read_write() {
1880        let executor = tokio::Runner::default();
1881        test_blob_read_write(executor);
1882    }
1883
1884    #[test]
1885    fn test_tokio_blob_resize() {
1886        let executor = tokio::Runner::default();
1887        test_blob_resize(executor);
1888    }
1889
1890    #[test]
1891    fn test_tokio_many_partition_read_write() {
1892        let executor = tokio::Runner::default();
1893        test_many_partition_read_write(executor);
1894    }
1895
1896    #[test]
1897    fn test_tokio_blob_read_past_length() {
1898        let executor = tokio::Runner::default();
1899        test_blob_read_past_length(executor);
1900    }
1901
1902    #[test]
1903    fn test_tokio_blob_clone_and_concurrent_read() {
1904        // Run test
1905        let executor = tokio::Runner::default();
1906        test_blob_clone_and_concurrent_read(executor);
1907    }
1908
1909    #[test]
1910    fn test_tokio_shutdown() {
1911        let executor = tokio::Runner::default();
1912        test_shutdown(executor);
1913    }
1914
1915    #[test]
1916    fn test_tokio_shutdown_multiple_signals() {
1917        let executor = tokio::Runner::default();
1918        test_shutdown_multiple_signals(executor);
1919    }
1920
1921    #[test]
1922    fn test_tokio_shutdown_timeout() {
1923        let executor = tokio::Runner::default();
1924        test_shutdown_timeout(executor);
1925    }
1926
1927    #[test]
1928    fn test_tokio_shutdown_multiple_stop_calls() {
1929        let executor = tokio::Runner::default();
1930        test_shutdown_multiple_stop_calls(executor);
1931    }
1932
1933    #[test]
1934    fn test_tokio_spawn_ref() {
1935        let executor = tokio::Runner::default();
1936        test_spawn_ref(executor);
1937    }
1938
1939    #[test]
1940    #[should_panic]
1941    fn test_tokio_spawn_ref_duplicate() {
1942        let executor = tokio::Runner::default();
1943        test_spawn_ref_duplicate(executor);
1944    }
1945
1946    #[test]
1947    #[should_panic]
1948    fn test_tokio_spawn_duplicate() {
1949        let executor = tokio::Runner::default();
1950        test_spawn_duplicate(executor);
1951    }
1952
1953    #[test]
1954    fn test_tokio_spawn_child() {
1955        let runner = tokio::Runner::default();
1956        test_spawn_child(runner);
1957    }
1958
1959    #[test]
1960    fn test_tokio_spawn_child_abort_on_parent_abort() {
1961        let runner = tokio::Runner::default();
1962        test_spawn_child_abort_on_parent_abort(runner);
1963    }
1964
1965    #[test]
1966    fn test_tokio_spawn_child_abort_on_parent_completion() {
1967        let runner = tokio::Runner::default();
1968        test_spawn_child_abort_on_parent_completion(runner);
1969    }
1970
1971    #[test]
1972    fn test_tokio_spawn_child_cascading_abort() {
1973        let runner = tokio::Runner::default();
1974        test_spawn_child_cascading_abort(runner);
1975    }
1976
1977    #[test]
1978    fn test_tokio_child_survives_sibling_completion() {
1979        for use_spawn_ref in [false, true] {
1980            let runner = tokio::Runner::default();
1981            test_child_survives_sibling_completion(runner, use_spawn_ref);
1982        }
1983    }
1984
1985    #[test]
1986    fn test_tokio_clone_context_no_child_inheritance() {
1987        let runner = tokio::Runner::default();
1988        test_clone_context_no_child_inheritance(runner);
1989    }
1990
1991    #[test]
1992    fn test_tokio_spawn_blocking() {
1993        for dedicated in [false, true] {
1994            let executor = tokio::Runner::default();
1995            test_spawn_blocking(executor, dedicated);
1996        }
1997    }
1998
1999    #[test]
2000    fn test_tokio_spawn_blocking_panic() {
2001        for dedicated in [false, true] {
2002            let executor = tokio::Runner::default();
2003            executor.start(|context| async move {
2004                let handle = context.spawn_blocking(dedicated, |_| {
2005                    panic!("blocking task panicked");
2006                });
2007                let result = handle.await;
2008                assert!(matches!(result, Err(Error::Exited)));
2009            });
2010        }
2011    }
2012
2013    #[test]
2014    fn test_tokio_spawn_blocking_abort() {
2015        for dedicated in [false, true] {
2016            let executor = tokio::Runner::default();
2017            test_spawn_blocking_abort(executor, dedicated);
2018        }
2019    }
2020
2021    #[test]
2022    fn test_tokio_spawn_blocking_ref() {
2023        for dedicated in [false, true] {
2024            let executor = tokio::Runner::default();
2025            test_spawn_blocking_ref(executor, dedicated);
2026        }
2027    }
2028
2029    #[test]
2030    #[should_panic]
2031    fn test_tokio_spawn_blocking_ref_duplicate() {
2032        for dedicated in [false, true] {
2033            let executor = tokio::Runner::default();
2034            test_spawn_blocking_ref_duplicate(executor, dedicated);
2035        }
2036    }
2037
2038    #[test]
2039    fn test_tokio_metrics() {
2040        let executor = tokio::Runner::default();
2041        test_metrics(executor);
2042    }
2043
2044    #[test]
2045    #[should_panic]
2046    fn test_tokio_metrics_label() {
2047        let executor = tokio::Runner::default();
2048        test_metrics_label(executor);
2049    }
2050
2051    #[test]
2052    fn test_tokio_process_rss_metric() {
2053        let executor = tokio::Runner::default();
2054        executor.start(|context| async move {
2055            loop {
2056                // Wait for RSS metric to be available
2057                let metrics = context.encode();
2058                if !metrics.contains("runtime_process_rss") {
2059                    context.sleep(Duration::from_millis(100)).await;
2060                    continue;
2061                }
2062
2063                // Verify the RSS value is eventually populated (greater than 0)
2064                for line in metrics.lines() {
2065                    if line.starts_with("runtime_process_rss")
2066                        && !line.starts_with("runtime_process_rss{")
2067                    {
2068                        let parts: Vec<&str> = line.split_whitespace().collect();
2069                        if parts.len() >= 2 {
2070                            let rss_value: i64 =
2071                                parts[1].parse().expect("Failed to parse RSS value");
2072                            if rss_value > 0 {
2073                                return;
2074                            }
2075                        }
2076                    }
2077                }
2078            }
2079        });
2080    }
2081
2082    #[test]
2083    fn test_tokio_telemetry() {
2084        let executor = tokio::Runner::default();
2085        executor.start(|context| async move {
2086            // Define the server address
2087            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2088
2089            // Configure telemetry
2090            tokio::telemetry::init(
2091                context.with_label("metrics"),
2092                tokio::telemetry::Logging {
2093                    level: Level::INFO,
2094                    json: false,
2095                },
2096                Some(address),
2097                None,
2098            );
2099
2100            // Register a test metric
2101            let counter: Counter<u64> = Counter::default();
2102            context.register("test_counter", "Test counter", counter.clone());
2103            counter.inc();
2104
2105            // Helper functions to parse HTTP response
2106            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2107                let mut line = Vec::new();
2108                loop {
2109                    let byte = stream.recv(vec![0; 1]).await?;
2110                    if byte[0] == b'\n' {
2111                        if line.last() == Some(&b'\r') {
2112                            line.pop(); // Remove trailing \r
2113                        }
2114                        break;
2115                    }
2116                    line.push(byte[0]);
2117                }
2118                String::from_utf8(line).map_err(|_| Error::ReadFailed)
2119            }
2120
2121            async fn read_headers<St: Stream>(
2122                stream: &mut St,
2123            ) -> Result<HashMap<String, String>, Error> {
2124                let mut headers = HashMap::new();
2125                loop {
2126                    let line = read_line(stream).await?;
2127                    if line.is_empty() {
2128                        break;
2129                    }
2130                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
2131                    if parts.len() == 2 {
2132                        headers.insert(parts[0].to_string(), parts[1].to_string());
2133                    }
2134                }
2135                Ok(headers)
2136            }
2137
2138            async fn read_body<St: Stream>(
2139                stream: &mut St,
2140                content_length: usize,
2141            ) -> Result<String, Error> {
2142                let read = stream.recv(vec![0; content_length]).await?;
2143                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2144            }
2145
2146            // Simulate a client connecting to the server
2147            let client_handle = context
2148                .with_label("client")
2149                .spawn(move |context| async move {
2150                    let (mut sink, mut stream) = loop {
2151                        match context.dial(address).await {
2152                            Ok((sink, stream)) => break (sink, stream),
2153                            Err(e) => {
2154                                // The client may be polled before the server is ready, that's alright!
2155                                error!(err =?e, "failed to connect");
2156                                context.sleep(Duration::from_millis(10)).await;
2157                            }
2158                        }
2159                    };
2160
2161                    // Send a GET request to the server
2162                    let request = format!(
2163                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2164                    );
2165                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
2166
2167                    // Read and verify the HTTP status line
2168                    let status_line = read_line(&mut stream).await.unwrap();
2169                    assert_eq!(status_line, "HTTP/1.1 200 OK");
2170
2171                    // Read and parse headers
2172                    let headers = read_headers(&mut stream).await.unwrap();
2173                    println!("Headers: {headers:?}");
2174                    let content_length = headers
2175                        .get("content-length")
2176                        .unwrap()
2177                        .parse::<usize>()
2178                        .unwrap();
2179
2180                    // Read and verify the body
2181                    let body = read_body(&mut stream, content_length).await.unwrap();
2182                    assert!(body.contains("test_counter_total 1"));
2183                });
2184
2185            // Wait for the client task to complete
2186            client_handle.await.unwrap();
2187        });
2188    }
2189}