commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23    future::Future,
24    io::Error as IoError,
25    net::SocketAddr,
26    time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36    if #[cfg(not(target_arch = "wasm32"))] {
37        pub mod tokio;
38        pub mod benchmarks;
39    }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49/// Prefix for runtime metrics.
50const METRICS_PREFIX: &str = "runtime";
51
52/// Errors that can occur when interacting with the runtime.
53#[derive(Error, Debug)]
54pub enum Error {
55    #[error("exited")]
56    Exited,
57    #[error("closed")]
58    Closed,
59    #[error("timeout")]
60    Timeout,
61    #[error("bind failed")]
62    BindFailed,
63    #[error("connection failed")]
64    ConnectionFailed,
65    #[error("write failed")]
66    WriteFailed,
67    #[error("read failed")]
68    ReadFailed,
69    #[error("send failed")]
70    SendFailed,
71    #[error("recv failed")]
72    RecvFailed,
73    #[error("partition creation failed: {0}")]
74    PartitionCreationFailed(String),
75    #[error("partition missing: {0}")]
76    PartitionMissing(String),
77    #[error("partition corrupt: {0}")]
78    PartitionCorrupt(String),
79    #[error("blob open failed: {0}/{1} error: {2}")]
80    BlobOpenFailed(String, String, IoError),
81    #[error("blob missing: {0}/{1}")]
82    BlobMissing(String, String),
83    #[error("blob resize failed: {0}/{1} error: {2}")]
84    BlobResizeFailed(String, String, IoError),
85    #[error("blob sync failed: {0}/{1} error: {2}")]
86    BlobSyncFailed(String, String, IoError),
87    #[error("blob insufficient length")]
88    BlobInsufficientLength,
89    #[error("offset overflow")]
90    OffsetOverflow,
91    #[error("io error: {0}")]
92    Io(#[from] IoError),
93}
94
95/// Interface that any task scheduler must implement to start
96/// running tasks.
97pub trait Runner {
98    /// Context defines the environment available to tasks.
99    /// Example of possible services provided by the context include:
100    /// - [Clock] for time-based operations
101    /// - [Network] for network operations
102    /// - [Storage] for storage operations
103    type Context;
104
105    /// Start running a root task.
106    fn start<F, Fut>(self, f: F) -> Fut::Output
107    where
108        F: FnOnce(Self::Context) -> Fut,
109        Fut: Future;
110}
111
112/// Interface that any task scheduler must implement to spawn tasks.
113pub trait Spawner: Clone + Send + Sync + 'static {
114    /// Enqueue a task to be executed.
115    ///
116    /// Unlike a future, a spawned task will start executing immediately (even if the caller
117    /// does not await the handle).
118    ///
119    /// Spawned tasks consume the context used to create them. This ensures that context cannot
120    /// be shared between tasks and that a task's context always comes from somewhere.
121    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122    where
123        F: FnOnce(Self) -> Fut + Send + 'static,
124        Fut: Future<Output = T> + Send + 'static,
125        T: Send + 'static;
126
127    /// Enqueue a task to be executed (without consuming the context).
128    ///
129    /// The semantics are the same as [Spawner::spawn].
130    ///
131    /// # Warning
132    ///
133    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
134    /// to prevent accidental misuse.
135    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136    where
137        F: Future<Output = T> + Send + 'static,
138        T: Send + 'static;
139
140    /// Enqueue a blocking task to be executed.
141    ///
142    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
143    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
144    /// thread (tasks that are expected to run indefinitely).
145    ///
146    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
147    /// result.
148    ///
149    /// # Motivation
150    ///
151    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
152    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
153    /// of said runtimes.
154    ///
155    /// # Warning
156    ///
157    /// Blocking tasks cannot be aborted.
158    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159    where
160        F: FnOnce(Self) -> T + Send + 'static,
161        T: Send + 'static;
162
163    /// Enqueue a blocking task to be executed (without consuming the context).
164    ///
165    /// The semantics are the same as [Spawner::spawn_blocking].
166    ///
167    /// # Warning
168    ///
169    /// If this function is used to spawn multiple tasks from the same context,
170    /// the runtime will panic to prevent accidental misuse.
171    fn spawn_blocking_ref<F, T>(
172        &mut self,
173        dedicated: bool,
174    ) -> impl FnOnce(F) -> Handle<T> + 'static
175    where
176        F: FnOnce() -> T + Send + 'static,
177        T: Send + 'static;
178
179    /// Signals the runtime to stop execution and waits for all outstanding tasks
180    /// to perform any required cleanup and exit.
181    ///
182    /// This method does not actually kill any tasks but rather signals to them, using
183    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
184    /// It then waits for all [signal::Signal] references to be dropped before returning.
185    ///
186    /// ## Multiple Stop Calls
187    ///
188    /// This method is idempotent and safe to call multiple times concurrently (on
189    /// different instances of the same context since it consumes `self`). The first
190    /// call initiates shutdown with the provided `value`, and all subsequent calls
191    /// will wait for the same completion regardless of their `value` parameter, i.e.
192    /// the original `value` from the first call is preserved.
193    ///
194    /// ## Timeout
195    ///
196    /// If a timeout is provided, the method will return an error if all [signal::Signal]
197    /// references have not been dropped within the specified duration.
198    fn stop(
199        self,
200        value: i32,
201        timeout: Option<Duration>,
202    ) -> impl Future<Output = Result<(), Error>> + Send;
203
204    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
205    /// any task.
206    ///
207    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
208    /// immediately. The [signal::Signal] returned will always resolve to the value of the
209    /// first [Spawner::stop] call.
210    fn stopped(&self) -> signal::Signal;
211}
212
213/// Interface to register and encode metrics.
214pub trait Metrics: Clone + Send + Sync + 'static {
215    /// Get the current label of the context.
216    fn label(&self) -> String;
217
218    /// Create a new instance of `Metrics` with the given label appended to the end
219    /// of the current `Metrics` label.
220    ///
221    /// This is commonly used to create a nested context for `register`.
222    ///
223    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
224    /// label (reserved for metrics for the runtime).
225    fn with_label(&self, label: &str) -> Self;
226
227    /// Prefix the given label with the current context's label.
228    ///
229    /// Unlike `with_label`, this method does not create a new context.
230    fn scoped_label(&self, label: &str) -> String {
231        let label = if self.label().is_empty() {
232            label.to_string()
233        } else {
234            format!("{}_{}", self.label(), label)
235        };
236        assert!(
237            !label.starts_with(METRICS_PREFIX),
238            "using runtime label is not allowed"
239        );
240        label
241    }
242
243    /// Register a metric with the runtime.
244    ///
245    /// Any registered metric will include (as a prefix) the label of the current context.
246    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
247
248    /// Encode all metrics into a buffer.
249    fn encode(&self) -> String;
250}
251
252/// Interface that any task scheduler must implement to provide
253/// time-based operations.
254///
255/// It is necessary to mock time to provide deterministic execution
256/// of arbitrary tasks.
257pub trait Clock: Clone + Send + Sync + 'static {
258    /// Returns the current time.
259    fn current(&self) -> SystemTime;
260
261    /// Sleep for the given duration.
262    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
263
264    /// Sleep until the given deadline.
265    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
266}
267
268/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
269pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
270
271/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
272pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
273
274/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
275pub type ListenerOf<N> = <N as crate::Network>::Listener;
276
277/// Interface that any runtime must implement to create
278/// network connections.
279pub trait Network: Clone + Send + Sync + 'static {
280    /// The type of [Listener] that's returned when binding to a socket.
281    /// Accepting a connection returns a [Sink] and [Stream] which are defined
282    /// by the [Listener] and used to send and receive data over the connection.
283    type Listener: Listener;
284
285    /// Bind to the given socket address.
286    fn bind(
287        &self,
288        socket: SocketAddr,
289    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
290
291    /// Dial the given socket address.
292    fn dial(
293        &self,
294        socket: SocketAddr,
295    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
296}
297
298/// Interface that any runtime must implement to handle
299/// incoming network connections.
300pub trait Listener: Sync + Send + 'static {
301    /// The type of [Sink] that's returned when accepting a connection.
302    /// This is used to send data to the remote connection.
303    type Sink: Sink;
304    /// The type of [Stream] that's returned when accepting a connection.
305    /// This is used to receive data from the remote connection.
306    type Stream: Stream;
307
308    /// Accept an incoming connection.
309    fn accept(
310        &mut self,
311    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
312
313    /// Returns the local address of the listener.
314    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
315}
316
317/// Interface that any runtime must implement to send
318/// messages over a network connection.
319pub trait Sink: Sync + Send + 'static {
320    /// Send a message to the sink.
321    fn send(
322        &mut self,
323        msg: impl Into<StableBuf> + Send,
324    ) -> impl Future<Output = Result<(), Error>> + Send;
325}
326
327/// Interface that any runtime must implement to receive
328/// messages over a network connection.
329pub trait Stream: Sync + Send + 'static {
330    /// Receive a message from the stream, storing it in the given buffer.
331    /// Reads exactly the number of bytes that fit in the buffer.
332    fn recv(
333        &mut self,
334        buf: impl Into<StableBuf> + Send,
335    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
336}
337
338/// Interface to interact with storage.
339///
340///
341/// To support storage implementations that enable concurrent reads and
342/// writes, blobs are responsible for maintaining synchronization.
343///
344/// Storage can be backed by a local filesystem, cloud storage, etc.
345pub trait Storage: Clone + Send + Sync + 'static {
346    /// The readable/writeable storage buffer that can be opened by this Storage.
347    type Blob: Blob;
348
349    /// Open an existing blob in a given partition or create a new one, returning
350    /// the blob and its length.
351    ///
352    /// Multiple instances of the same blob can be opened concurrently, however,
353    /// writing to the same blob concurrently may lead to undefined behavior.
354    fn open(
355        &self,
356        partition: &str,
357        name: &[u8],
358    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
359
360    /// Remove a blob from a given partition.
361    ///
362    /// If no `name` is provided, the entire partition is removed.
363    fn remove(
364        &self,
365        partition: &str,
366        name: Option<&[u8]>,
367    ) -> impl Future<Output = Result<(), Error>> + Send;
368
369    /// Return all blobs in a given partition.
370    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
371}
372
373/// Interface to read and write to a blob.
374///
375/// To support blob implementations that enable concurrent reads and
376/// writes, blobs are responsible for maintaining synchronization.
377///
378/// Cloning a blob is similar to wrapping a single file descriptor in
379/// a lock whereas opening a new blob (of the same name) is similar to
380/// opening a new file descriptor. If multiple blobs are opened with the same
381/// name, they are not expected to coordinate access to underlying storage
382/// and writing to both is undefined behavior.
383///
384/// When a blob is dropped, any unsynced changes may be discarded. Implementations
385/// may attempt to sync during drop but errors will go unhandled. Call `sync`
386/// before dropping to ensure all changes are durably persisted.
387#[allow(clippy::len_without_is_empty)]
388pub trait Blob: Clone + Send + Sync + 'static {
389    /// Read from the blob at the given offset.
390    ///
391    /// `read_at` does not return the number of bytes read because it
392    /// only returns once the entire buffer has been filled.
393    fn read_at(
394        &self,
395        buf: impl Into<StableBuf> + Send,
396        offset: u64,
397    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
398
399    /// Write `buf` to the blob at the given offset.
400    fn write_at(
401        &self,
402        buf: impl Into<StableBuf> + Send,
403        offset: u64,
404    ) -> impl Future<Output = Result<(), Error>> + Send;
405
406    /// Resize the blob to the given length.
407    ///
408    /// If the length is greater than the current length, the blob is extended with zeros.
409    /// If the length is less than the current length, the blob is resized.
410    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
411
412    /// Ensure all pending data is durably persisted.
413    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use bytes::Bytes;
420    use commonware_macros::select;
421    use futures::{
422        channel::{mpsc, oneshot},
423        future::{pending, ready},
424        join, pin_mut, FutureExt, SinkExt, StreamExt,
425    };
426    use prometheus_client::metrics::counter::Counter;
427    use std::{
428        collections::HashMap,
429        panic::{catch_unwind, AssertUnwindSafe},
430        str::FromStr,
431        sync::{
432            atomic::{AtomicU32, Ordering},
433            Arc, Mutex,
434        },
435    };
436    use tracing::{error, Level};
437    use utils::reschedule;
438
439    fn test_error_future<R: Runner>(runner: R) {
440        async fn error_future() -> Result<&'static str, &'static str> {
441            Err("An error occurred")
442        }
443        let result = runner.start(|_| error_future());
444        assert_eq!(result, Err("An error occurred"));
445    }
446
447    fn test_clock_sleep<R: Runner>(runner: R)
448    where
449        R::Context: Spawner + Clock,
450    {
451        runner.start(|context| async move {
452            // Capture initial time
453            let start = context.current();
454            let sleep_duration = Duration::from_millis(10);
455            context.sleep(sleep_duration).await;
456
457            // After run, time should have advanced
458            let end = context.current();
459            assert!(end.duration_since(start).unwrap() >= sleep_duration);
460        });
461    }
462
463    fn test_clock_sleep_until<R: Runner>(runner: R)
464    where
465        R::Context: Spawner + Clock,
466    {
467        runner.start(|context| async move {
468            // Trigger sleep
469            let now = context.current();
470            context.sleep_until(now + Duration::from_millis(100)).await;
471
472            // Ensure slept duration has elapsed
473            let elapsed = now.elapsed().unwrap();
474            assert!(elapsed >= Duration::from_millis(100));
475        });
476    }
477
478    fn test_root_finishes<R: Runner>(runner: R)
479    where
480        R::Context: Spawner,
481    {
482        runner.start(|context| async move {
483            context.spawn(|_| async move {
484                loop {
485                    reschedule().await;
486                }
487            });
488        });
489    }
490
491    fn test_spawn_abort<R: Runner>(runner: R)
492    where
493        R::Context: Spawner,
494    {
495        runner.start(|context| async move {
496            let handle = context.spawn(|_| async move {
497                loop {
498                    reschedule().await;
499                }
500            });
501            handle.abort();
502            assert!(matches!(handle.await, Err(Error::Closed)));
503        });
504    }
505
506    fn test_panic_aborts_root<R: Runner>(runner: R) {
507        let result = catch_unwind(AssertUnwindSafe(|| {
508            runner.start(|_| async move {
509                panic!("blah");
510            });
511        }));
512        result.unwrap_err();
513    }
514
515    fn test_panic_aborts_spawn<R: Runner>(runner: R)
516    where
517        R::Context: Spawner,
518    {
519        let result = runner.start(|context| async move {
520            let result = context.spawn(|_| async move {
521                panic!("blah");
522            });
523            assert!(matches!(result.await, Err(Error::Exited)));
524            Result::<(), Error>::Ok(())
525        });
526
527        // Ensure panic was caught
528        result.unwrap();
529    }
530
531    fn test_select<R: Runner>(runner: R) {
532        runner.start(|_| async move {
533            // Test first branch
534            let output = Mutex::new(0);
535            select! {
536                v1 = ready(1) => {
537                    *output.lock().unwrap() = v1;
538                },
539                v2 = ready(2) => {
540                    *output.lock().unwrap() = v2;
541                },
542            };
543            assert_eq!(*output.lock().unwrap(), 1);
544
545            // Test second branch
546            select! {
547                v1 = std::future::pending::<i32>() => {
548                    *output.lock().unwrap() = v1;
549                },
550                v2 = ready(2) => {
551                    *output.lock().unwrap() = v2;
552                },
553            };
554            assert_eq!(*output.lock().unwrap(), 2);
555        });
556    }
557
558    /// Ensure future fusing works as expected.
559    fn test_select_loop<R: Runner>(runner: R)
560    where
561        R::Context: Clock,
562    {
563        runner.start(|context| async move {
564            // Should hit timeout
565            let (mut sender, mut receiver) = mpsc::unbounded();
566            for _ in 0..2 {
567                select! {
568                    v = receiver.next() => {
569                        panic!("unexpected value: {v:?}");
570                    },
571                    _ = context.sleep(Duration::from_millis(100)) => {
572                        continue;
573                    },
574                };
575            }
576
577            // Populate channel
578            sender.send(0).await.unwrap();
579            sender.send(1).await.unwrap();
580
581            // Prefer not reading channel without losing messages
582            select! {
583                _ = async {} => {
584                    // Skip reading from channel even though populated
585                },
586                v = receiver.next() => {
587                    panic!("unexpected value: {v:?}");
588                },
589            };
590
591            // Process messages
592            for i in 0..2 {
593                select! {
594                    _ = context.sleep(Duration::from_millis(100)) => {
595                        panic!("timeout");
596                    },
597                    v = receiver.next() => {
598                        assert_eq!(v.unwrap(), i);
599                    },
600                };
601            }
602        });
603    }
604
605    fn test_storage_operations<R: Runner>(runner: R)
606    where
607        R::Context: Storage,
608    {
609        runner.start(|context| async move {
610            let partition = "test_partition";
611            let name = b"test_blob";
612
613            // Open a new blob
614            let (blob, _) = context
615                .open(partition, name)
616                .await
617                .expect("Failed to open blob");
618
619            // Write data to the blob
620            let data = b"Hello, Storage!";
621            blob.write_at(Vec::from(data), 0)
622                .await
623                .expect("Failed to write to blob");
624
625            // Sync the blob
626            blob.sync().await.expect("Failed to sync blob");
627
628            // Read data from the blob
629            let read = blob
630                .read_at(vec![0; data.len()], 0)
631                .await
632                .expect("Failed to read from blob");
633            assert_eq!(read.as_ref(), data);
634
635            // Sync the blob
636            blob.sync().await.expect("Failed to sync blob");
637
638            // Scan blobs in the partition
639            let blobs = context
640                .scan(partition)
641                .await
642                .expect("Failed to scan partition");
643            assert!(blobs.contains(&name.to_vec()));
644
645            // Reopen the blob
646            let (blob, len) = context
647                .open(partition, name)
648                .await
649                .expect("Failed to reopen blob");
650            assert_eq!(len, data.len() as u64);
651
652            // Read data part of message back
653            let read = blob
654                .read_at(vec![0u8; 7], 7)
655                .await
656                .expect("Failed to read data");
657            assert_eq!(read.as_ref(), b"Storage");
658
659            // Sync the blob
660            blob.sync().await.expect("Failed to sync blob");
661
662            // Remove the blob
663            context
664                .remove(partition, Some(name))
665                .await
666                .expect("Failed to remove blob");
667
668            // Ensure the blob is removed
669            let blobs = context
670                .scan(partition)
671                .await
672                .expect("Failed to scan partition");
673            assert!(!blobs.contains(&name.to_vec()));
674
675            // Remove the partition
676            context
677                .remove(partition, None)
678                .await
679                .expect("Failed to remove partition");
680
681            // Scan the partition
682            let result = context.scan(partition).await;
683            assert!(matches!(result, Err(Error::PartitionMissing(_))));
684        });
685    }
686
687    fn test_blob_read_write<R: Runner>(runner: R)
688    where
689        R::Context: Storage,
690    {
691        runner.start(|context| async move {
692            let partition = "test_partition";
693            let name = b"test_blob_rw";
694
695            // Open a new blob
696            let (blob, _) = context
697                .open(partition, name)
698                .await
699                .expect("Failed to open blob");
700
701            // Write data at different offsets
702            let data1 = b"Hello";
703            let data2 = b"World";
704            blob.write_at(Vec::from(data1), 0)
705                .await
706                .expect("Failed to write data1");
707            blob.write_at(Vec::from(data2), 5)
708                .await
709                .expect("Failed to write data2");
710
711            // Read data back
712            let read = blob
713                .read_at(vec![0u8; 10], 0)
714                .await
715                .expect("Failed to read data");
716            assert_eq!(&read.as_ref()[..5], data1);
717            assert_eq!(&read.as_ref()[5..], data2);
718
719            // Read past end of blob
720            let result = blob.read_at(vec![0u8; 10], 10).await;
721            assert!(result.is_err());
722
723            // Rewrite data without affecting length
724            let data3 = b"Store";
725            blob.write_at(Vec::from(data3), 5)
726                .await
727                .expect("Failed to write data3");
728
729            // Read data back
730            let read = blob
731                .read_at(vec![0u8; 10], 0)
732                .await
733                .expect("Failed to read data");
734            assert_eq!(&read.as_ref()[..5], data1);
735            assert_eq!(&read.as_ref()[5..], data3);
736
737            // Read past end of blob
738            let result = blob.read_at(vec![0u8; 10], 10).await;
739            assert!(result.is_err());
740        });
741    }
742
743    fn test_blob_resize<R: Runner>(runner: R)
744    where
745        R::Context: Storage,
746    {
747        runner.start(|context| async move {
748            let partition = "test_partition_resize";
749            let name = b"test_blob_resize";
750
751            // Open and write to a new blob
752            let (blob, _) = context
753                .open(partition, name)
754                .await
755                .expect("Failed to open blob");
756
757            let data = b"some data";
758            blob.write_at(data.to_vec(), 0)
759                .await
760                .expect("Failed to write");
761            blob.sync().await.expect("Failed to sync after write");
762
763            // Re-open and check length
764            let (blob, len) = context.open(partition, name).await.unwrap();
765            assert_eq!(len, data.len() as u64);
766
767            // Resize to extend the file
768            let new_len = (data.len() as u64) * 2;
769            blob.resize(new_len)
770                .await
771                .expect("Failed to resize to extend");
772            blob.sync().await.expect("Failed to sync after resize");
773
774            // Re-open and check length again
775            let (blob, len) = context.open(partition, name).await.unwrap();
776            assert_eq!(len, new_len);
777
778            // Read original data
779            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
780            assert_eq!(read_buf.as_ref(), data);
781
782            // Read extended part (should be zeros)
783            let extended_part = blob
784                .read_at(vec![0; data.len()], data.len() as u64)
785                .await
786                .unwrap();
787            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
788
789            // Truncate the blob
790            blob.resize(data.len() as u64).await.unwrap();
791            blob.sync().await.unwrap();
792
793            // Reopen to check truncation
794            let (blob, size) = context.open(partition, name).await.unwrap();
795            assert_eq!(size, data.len() as u64);
796
797            // Read truncated data
798            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
799            assert_eq!(read_buf.as_ref(), data);
800            blob.sync().await.unwrap();
801        });
802    }
803
804    fn test_many_partition_read_write<R: Runner>(runner: R)
805    where
806        R::Context: Storage,
807    {
808        runner.start(|context| async move {
809            let partitions = ["partition1", "partition2", "partition3"];
810            let name = b"test_blob_rw";
811            let data1 = b"Hello";
812            let data2 = b"World";
813
814            for (additional, partition) in partitions.iter().enumerate() {
815                // Open a new blob
816                let (blob, _) = context
817                    .open(partition, name)
818                    .await
819                    .expect("Failed to open blob");
820
821                // Write data at different offsets
822                blob.write_at(Vec::from(data1), 0)
823                    .await
824                    .expect("Failed to write data1");
825                blob.write_at(Vec::from(data2), 5 + additional as u64)
826                    .await
827                    .expect("Failed to write data2");
828
829                // Sync the blob
830                blob.sync().await.expect("Failed to sync blob");
831            }
832
833            for (additional, partition) in partitions.iter().enumerate() {
834                // Open a new blob
835                let (blob, len) = context
836                    .open(partition, name)
837                    .await
838                    .expect("Failed to open blob");
839                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
840
841                // Read data back
842                let read = blob
843                    .read_at(vec![0u8; 10 + additional], 0)
844                    .await
845                    .expect("Failed to read data");
846                assert_eq!(&read.as_ref()[..5], b"Hello");
847                assert_eq!(&read.as_ref()[5 + additional..], b"World");
848            }
849        });
850    }
851
852    fn test_blob_read_past_length<R: Runner>(runner: R)
853    where
854        R::Context: Storage,
855    {
856        runner.start(|context| async move {
857            let partition = "test_partition";
858            let name = b"test_blob_rw";
859
860            // Open a new blob
861            let (blob, _) = context
862                .open(partition, name)
863                .await
864                .expect("Failed to open blob");
865
866            // Read data past file length (empty file)
867            let result = blob.read_at(vec![0u8; 10], 0).await;
868            assert!(result.is_err());
869
870            // Write data to the blob
871            let data = b"Hello, Storage!".to_vec();
872            blob.write_at(data, 0)
873                .await
874                .expect("Failed to write to blob");
875
876            // Read data past file length (non-empty file)
877            let result = blob.read_at(vec![0u8; 20], 0).await;
878            assert!(result.is_err());
879        })
880    }
881
882    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
883    where
884        R::Context: Spawner + Storage + Metrics,
885    {
886        runner.start(|context| async move {
887            let partition = "test_partition";
888            let name = b"test_blob_rw";
889
890            // Open a new blob
891            let (blob, _) = context
892                .open(partition, name)
893                .await
894                .expect("Failed to open blob");
895
896            // Write data to the blob
897            let data = b"Hello, Storage!";
898            blob.write_at(Vec::from(data), 0)
899                .await
900                .expect("Failed to write to blob");
901
902            // Sync the blob
903            blob.sync().await.expect("Failed to sync blob");
904
905            // Read data from the blob in clone
906            let check1 = context.with_label("check1").spawn({
907                let blob = blob.clone();
908                move |_| async move {
909                    let read = blob
910                        .read_at(vec![0u8; data.len()], 0)
911                        .await
912                        .expect("Failed to read from blob");
913                    assert_eq!(read.as_ref(), data);
914                }
915            });
916            let check2 = context.with_label("check2").spawn({
917                let blob = blob.clone();
918                move |_| async move {
919                    let read = blob
920                        .read_at(vec![0; data.len()], 0)
921                        .await
922                        .expect("Failed to read from blob");
923                    assert_eq!(read.as_ref(), data);
924                }
925            });
926
927            // Wait for both reads to complete
928            let result = join!(check1, check2);
929            assert!(result.0.is_ok());
930            assert!(result.1.is_ok());
931
932            // Read data from the blob
933            let read = blob
934                .read_at(vec![0; data.len()], 0)
935                .await
936                .expect("Failed to read from blob");
937            assert_eq!(read.as_ref(), data);
938
939            // Drop the blob
940            drop(blob);
941
942            // Ensure no blobs still open
943            let buffer = context.encode();
944            assert!(buffer.contains("open_blobs 0"));
945        });
946    }
947
948    fn test_shutdown<R: Runner>(runner: R)
949    where
950        R::Context: Spawner + Metrics + Clock,
951    {
952        let kill = 9;
953        runner.start(|context| async move {
954            // Spawn a task that waits for signal
955            let before = context
956                .with_label("before")
957                .spawn(move |context| async move {
958                    let mut signal = context.stopped();
959                    let value = (&mut signal).await.unwrap();
960                    assert_eq!(value, kill);
961                    drop(signal);
962                });
963
964            // Signal the tasks and wait for them to stop
965            let result = context.clone().stop(kill, None).await;
966            assert!(result.is_ok());
967
968            // Spawn a task after stop is called
969            let after = context
970                .with_label("after")
971                .spawn(move |context| async move {
972                    // A call to `stopped()` after `stop()` resolves immediately
973                    let value = context.stopped().await.unwrap();
974                    assert_eq!(value, kill);
975                });
976
977            // Ensure both tasks complete
978            let result = join!(before, after);
979            assert!(result.0.is_ok());
980            assert!(result.1.is_ok());
981        });
982    }
983
984    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
985    where
986        R::Context: Spawner + Metrics + Clock,
987    {
988        let kill = 42;
989        runner.start(|context| async move {
990            let (started_tx, mut started_rx) = mpsc::channel(3);
991            let counter = Arc::new(AtomicU32::new(0));
992
993            // Spawn 3 tasks that do cleanup work after receiving stop signal
994            // and increment a shared counter
995            let task = |cleanup_duration: Duration| {
996                let context = context.clone();
997                let counter = counter.clone();
998                let mut started_tx = started_tx.clone();
999                context.spawn(move |context| async move {
1000                    // Wait for signal to be acquired
1001                    let mut signal = context.stopped();
1002                    started_tx.send(()).await.unwrap();
1003
1004                    // Increment once killed
1005                    let value = (&mut signal).await.unwrap();
1006                    assert_eq!(value, kill);
1007                    context.sleep(cleanup_duration).await;
1008                    counter.fetch_add(1, Ordering::SeqCst);
1009
1010                    // Wait to drop signal until work has been done
1011                    drop(signal);
1012                })
1013            };
1014
1015            let task1 = task(Duration::from_millis(10));
1016            let task2 = task(Duration::from_millis(20));
1017            let task3 = task(Duration::from_millis(30));
1018
1019            // Give tasks time to start
1020            for _ in 0..3 {
1021                started_rx.next().await.unwrap();
1022            }
1023
1024            // Stop and verify all cleanup completed
1025            context.stop(kill, None).await.unwrap();
1026            assert_eq!(counter.load(Ordering::SeqCst), 3);
1027
1028            // Ensure all tasks completed
1029            let result = join!(task1, task2, task3);
1030            assert!(result.0.is_ok());
1031            assert!(result.1.is_ok());
1032            assert!(result.2.is_ok());
1033        });
1034    }
1035
1036    fn test_shutdown_timeout<R: Runner>(runner: R)
1037    where
1038        R::Context: Spawner + Metrics + Clock,
1039    {
1040        let kill = 42;
1041        runner.start(|context| async move {
1042            // Setup startup coordinator
1043            let (started_tx, started_rx) = oneshot::channel();
1044
1045            // Spawn a task that never completes its cleanup
1046            context.clone().spawn(move |context| async move {
1047                let signal = context.stopped();
1048                started_tx.send(()).unwrap();
1049                pending::<()>().await;
1050                signal.await.unwrap();
1051            });
1052
1053            // Try to stop with a timeout
1054            started_rx.await.unwrap();
1055            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1056
1057            // Assert that we got a timeout error
1058            assert!(matches!(result, Err(Error::Timeout)));
1059        });
1060    }
1061
1062    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1063    where
1064        R::Context: Spawner + Metrics + Clock,
1065    {
1066        let kill1 = 42;
1067        let kill2 = 43;
1068
1069        runner.start(|context| async move {
1070            let (started_tx, started_rx) = oneshot::channel();
1071            let counter = Arc::new(AtomicU32::new(0));
1072
1073            // Spawn a task that delays completion to test timing
1074            let task = context.with_label("blocking_task").spawn({
1075                let counter = counter.clone();
1076                move |context| async move {
1077                    // Wait for signal to be acquired
1078                    let mut signal = context.stopped();
1079                    started_tx.send(()).unwrap();
1080
1081                    // Wait for signal to be resolved
1082                    let value = (&mut signal).await.unwrap();
1083                    assert_eq!(value, kill1);
1084                    context.sleep(Duration::from_millis(50)).await;
1085
1086                    // Increment counter
1087                    counter.fetch_add(1, Ordering::SeqCst);
1088                    drop(signal);
1089                }
1090            });
1091
1092            // Give task time to start
1093            started_rx.await.unwrap();
1094
1095            // Issue two separate stop calls
1096            // The second stop call uses a different stop value that should be ignored
1097            let stop_task1 = context.clone().stop(kill1, None);
1098            pin_mut!(stop_task1);
1099            let stop_task2 = context.clone().stop(kill2, None);
1100            pin_mut!(stop_task2);
1101
1102            // Both of them should be awaiting completion
1103            assert!(stop_task1.as_mut().now_or_never().is_none());
1104            assert!(stop_task2.as_mut().now_or_never().is_none());
1105
1106            // Wait for both stop calls to complete
1107            assert!(stop_task1.await.is_ok());
1108            assert!(stop_task2.await.is_ok());
1109
1110            // Verify first stop value wins
1111            let sig = context.stopped().await;
1112            assert_eq!(sig.unwrap(), kill1);
1113
1114            // Wait for blocking task to complete
1115            let result = task.await;
1116            assert!(result.is_ok());
1117            assert_eq!(counter.load(Ordering::SeqCst), 1);
1118
1119            // Post-completion stop should return immediately
1120            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1121        });
1122    }
1123
1124    fn test_spawn_ref<R: Runner>(runner: R)
1125    where
1126        R::Context: Spawner,
1127    {
1128        runner.start(|mut context| async move {
1129            let handle = context.spawn_ref();
1130            let result = handle(async move { 42 }).await;
1131            assert!(matches!(result, Ok(42)));
1132        });
1133    }
1134
1135    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1136    where
1137        R::Context: Spawner,
1138    {
1139        runner.start(|mut context| async move {
1140            let handle = context.spawn_ref();
1141            let result = handle(async move { 42 }).await;
1142            assert!(matches!(result, Ok(42)));
1143
1144            // Ensure context is consumed
1145            let handle = context.spawn_ref();
1146            let result = handle(async move { 42 }).await;
1147            assert!(matches!(result, Ok(42)));
1148        });
1149    }
1150
1151    fn test_spawn_duplicate<R: Runner>(runner: R)
1152    where
1153        R::Context: Spawner,
1154    {
1155        runner.start(|mut context| async move {
1156            let handle = context.spawn_ref();
1157            let result = handle(async move { 42 }).await;
1158            assert!(matches!(result, Ok(42)));
1159
1160            // Ensure context is consumed
1161            context.spawn(|_| async move { 42 });
1162        });
1163    }
1164
1165    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1166    where
1167        R::Context: Spawner,
1168    {
1169        runner.start(|context| async move {
1170            let handle = context.spawn_blocking(dedicated, |_| 42);
1171            let result = handle.await;
1172            assert!(matches!(result, Ok(42)));
1173        });
1174    }
1175
1176    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1177    where
1178        R::Context: Spawner,
1179    {
1180        runner.start(|mut context| async move {
1181            let spawn = context.spawn_blocking_ref(dedicated);
1182            let handle = spawn(|| 42);
1183            let result = handle.await;
1184            assert!(matches!(result, Ok(42)));
1185        });
1186    }
1187
1188    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1189    where
1190        R::Context: Spawner,
1191    {
1192        runner.start(|mut context| async move {
1193            let spawn = context.spawn_blocking_ref(dedicated);
1194            let result = spawn(|| 42).await;
1195            assert!(matches!(result, Ok(42)));
1196
1197            // Ensure context is consumed
1198            context.spawn_blocking(dedicated, |_| 42);
1199        });
1200    }
1201
1202    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1203    where
1204        R::Context: Spawner,
1205    {
1206        runner.start(|context| async move {
1207            // Create task
1208            let (sender, mut receiver) = oneshot::channel();
1209            let handle = context.spawn_blocking(dedicated, move |_| {
1210                // Wait for abort to be called
1211                loop {
1212                    if receiver.try_recv().is_ok() {
1213                        break;
1214                    }
1215                }
1216
1217                // Perform a long-running operation
1218                let mut count = 0;
1219                loop {
1220                    count += 1;
1221                    if count >= 100_000_000 {
1222                        break;
1223                    }
1224                }
1225                count
1226            });
1227
1228            // Abort the task
1229            //
1230            // If there was an `.await` prior to sending a message over the oneshot, this test
1231            // could deadlock (depending on the runtime implementation) because the blocking task
1232            // would never yield (preventing send from being called).
1233            handle.abort();
1234            sender.send(()).unwrap();
1235
1236            // Wait for the task to complete
1237            assert!(matches!(handle.await, Ok(100_000_000)));
1238        });
1239    }
1240
1241    fn test_metrics<R: Runner>(runner: R)
1242    where
1243        R::Context: Metrics,
1244    {
1245        runner.start(|context| async move {
1246            // Assert label
1247            assert_eq!(context.label(), "");
1248
1249            // Register a metric
1250            let counter = Counter::<u64>::default();
1251            context.register("test", "test", counter.clone());
1252
1253            // Increment the counter
1254            counter.inc();
1255
1256            // Encode metrics
1257            let buffer = context.encode();
1258            assert!(buffer.contains("test_total 1"));
1259
1260            // Nested context
1261            let context = context.with_label("nested");
1262            let nested_counter = Counter::<u64>::default();
1263            context.register("test", "test", nested_counter.clone());
1264
1265            // Increment the counter
1266            nested_counter.inc();
1267
1268            // Encode metrics
1269            let buffer = context.encode();
1270            assert!(buffer.contains("nested_test_total 1"));
1271            assert!(buffer.contains("test_total 1"));
1272        });
1273    }
1274
1275    fn test_metrics_label<R: Runner>(runner: R)
1276    where
1277        R::Context: Metrics,
1278    {
1279        runner.start(|context| async move {
1280            context.with_label(METRICS_PREFIX);
1281        })
1282    }
1283
1284    #[test]
1285    fn test_deterministic_future() {
1286        let runner = deterministic::Runner::default();
1287        test_error_future(runner);
1288    }
1289
1290    #[test]
1291    fn test_deterministic_clock_sleep() {
1292        let executor = deterministic::Runner::default();
1293        test_clock_sleep(executor);
1294    }
1295
1296    #[test]
1297    fn test_deterministic_clock_sleep_until() {
1298        let executor = deterministic::Runner::default();
1299        test_clock_sleep_until(executor);
1300    }
1301
1302    #[test]
1303    fn test_deterministic_root_finishes() {
1304        let executor = deterministic::Runner::default();
1305        test_root_finishes(executor);
1306    }
1307
1308    #[test]
1309    fn test_deterministic_spawn_abort() {
1310        let executor = deterministic::Runner::default();
1311        test_spawn_abort(executor);
1312    }
1313
1314    #[test]
1315    fn test_deterministic_panic_aborts_root() {
1316        let runner = deterministic::Runner::default();
1317        test_panic_aborts_root(runner);
1318    }
1319
1320    #[test]
1321    #[should_panic(expected = "blah")]
1322    fn test_deterministic_panic_aborts_spawn() {
1323        let executor = deterministic::Runner::default();
1324        test_panic_aborts_spawn(executor);
1325    }
1326
1327    #[test]
1328    fn test_deterministic_select() {
1329        let executor = deterministic::Runner::default();
1330        test_select(executor);
1331    }
1332
1333    #[test]
1334    fn test_deterministic_select_loop() {
1335        let executor = deterministic::Runner::default();
1336        test_select_loop(executor);
1337    }
1338
1339    #[test]
1340    fn test_deterministic_storage_operations() {
1341        let executor = deterministic::Runner::default();
1342        test_storage_operations(executor);
1343    }
1344
1345    #[test]
1346    fn test_deterministic_blob_read_write() {
1347        let executor = deterministic::Runner::default();
1348        test_blob_read_write(executor);
1349    }
1350
1351    #[test]
1352    fn test_deterministic_blob_resize() {
1353        let executor = deterministic::Runner::default();
1354        test_blob_resize(executor);
1355    }
1356
1357    #[test]
1358    fn test_deterministic_many_partition_read_write() {
1359        let executor = deterministic::Runner::default();
1360        test_many_partition_read_write(executor);
1361    }
1362
1363    #[test]
1364    fn test_deterministic_blob_read_past_length() {
1365        let executor = deterministic::Runner::default();
1366        test_blob_read_past_length(executor);
1367    }
1368
1369    #[test]
1370    fn test_deterministic_blob_clone_and_concurrent_read() {
1371        // Run test
1372        let executor = deterministic::Runner::default();
1373        test_blob_clone_and_concurrent_read(executor);
1374    }
1375
1376    #[test]
1377    fn test_deterministic_shutdown() {
1378        let executor = deterministic::Runner::default();
1379        test_shutdown(executor);
1380    }
1381
1382    #[test]
1383    fn test_deterministic_shutdown_multiple_signals() {
1384        let executor = deterministic::Runner::default();
1385        test_shutdown_multiple_signals(executor);
1386    }
1387
1388    #[test]
1389    fn test_deterministic_shutdown_timeout() {
1390        let executor = deterministic::Runner::default();
1391        test_shutdown_timeout(executor);
1392    }
1393
1394    #[test]
1395    fn test_deterministic_shutdown_multiple_stop_calls() {
1396        let executor = deterministic::Runner::default();
1397        test_shutdown_multiple_stop_calls(executor);
1398    }
1399
1400    #[test]
1401    fn test_deterministic_spawn_ref() {
1402        let executor = deterministic::Runner::default();
1403        test_spawn_ref(executor);
1404    }
1405
1406    #[test]
1407    #[should_panic]
1408    fn test_deterministic_spawn_ref_duplicate() {
1409        let executor = deterministic::Runner::default();
1410        test_spawn_ref_duplicate(executor);
1411    }
1412
1413    #[test]
1414    #[should_panic]
1415    fn test_deterministic_spawn_duplicate() {
1416        let executor = deterministic::Runner::default();
1417        test_spawn_duplicate(executor);
1418    }
1419
1420    #[test]
1421    fn test_deterministic_spawn_blocking() {
1422        for dedicated in [false, true] {
1423            let executor = deterministic::Runner::default();
1424            test_spawn_blocking(executor, dedicated);
1425        }
1426    }
1427
1428    #[test]
1429    #[should_panic(expected = "blocking task panicked")]
1430    fn test_deterministic_spawn_blocking_panic() {
1431        for dedicated in [false, true] {
1432            let executor = deterministic::Runner::default();
1433            executor.start(|context| async move {
1434                let handle = context.spawn_blocking(dedicated, |_| {
1435                    panic!("blocking task panicked");
1436                });
1437                handle.await.unwrap();
1438            });
1439        }
1440    }
1441
1442    #[test]
1443    fn test_deterministic_spawn_blocking_abort() {
1444        for dedicated in [false, true] {
1445            let executor = deterministic::Runner::default();
1446            test_spawn_blocking_abort(executor, dedicated);
1447        }
1448    }
1449
1450    #[test]
1451    fn test_deterministic_spawn_blocking_ref() {
1452        for dedicated in [false, true] {
1453            let executor = deterministic::Runner::default();
1454            test_spawn_blocking_ref(executor, dedicated);
1455        }
1456    }
1457
1458    #[test]
1459    #[should_panic]
1460    fn test_deterministic_spawn_blocking_ref_duplicate() {
1461        for dedicated in [false, true] {
1462            let executor = deterministic::Runner::default();
1463            test_spawn_blocking_ref_duplicate(executor, dedicated);
1464        }
1465    }
1466
1467    #[test]
1468    fn test_deterministic_metrics() {
1469        let executor = deterministic::Runner::default();
1470        test_metrics(executor);
1471    }
1472
1473    #[test]
1474    #[should_panic]
1475    fn test_deterministic_metrics_label() {
1476        let executor = deterministic::Runner::default();
1477        test_metrics_label(executor);
1478    }
1479
1480    #[test]
1481    fn test_tokio_error_future() {
1482        let runner = tokio::Runner::default();
1483        test_error_future(runner);
1484    }
1485
1486    #[test]
1487    fn test_tokio_clock_sleep() {
1488        let executor = tokio::Runner::default();
1489        test_clock_sleep(executor);
1490    }
1491
1492    #[test]
1493    fn test_tokio_clock_sleep_until() {
1494        let executor = tokio::Runner::default();
1495        test_clock_sleep_until(executor);
1496    }
1497
1498    #[test]
1499    fn test_tokio_root_finishes() {
1500        let executor = tokio::Runner::default();
1501        test_root_finishes(executor);
1502    }
1503
1504    #[test]
1505    fn test_tokio_spawn_abort() {
1506        let executor = tokio::Runner::default();
1507        test_spawn_abort(executor);
1508    }
1509
1510    #[test]
1511    fn test_tokio_panic_aborts_root() {
1512        let executor = tokio::Runner::default();
1513        test_panic_aborts_root(executor);
1514    }
1515
1516    #[test]
1517    fn test_tokio_panic_aborts_spawn() {
1518        let executor = tokio::Runner::default();
1519        test_panic_aborts_spawn(executor);
1520    }
1521
1522    #[test]
1523    fn test_tokio_select() {
1524        let executor = tokio::Runner::default();
1525        test_select(executor);
1526    }
1527
1528    #[test]
1529    fn test_tokio_select_loop() {
1530        let executor = tokio::Runner::default();
1531        test_select_loop(executor);
1532    }
1533
1534    #[test]
1535    fn test_tokio_storage_operations() {
1536        let executor = tokio::Runner::default();
1537        test_storage_operations(executor);
1538    }
1539
1540    #[test]
1541    fn test_tokio_blob_read_write() {
1542        let executor = tokio::Runner::default();
1543        test_blob_read_write(executor);
1544    }
1545
1546    #[test]
1547    fn test_tokio_blob_resize() {
1548        let executor = tokio::Runner::default();
1549        test_blob_resize(executor);
1550    }
1551
1552    #[test]
1553    fn test_tokio_many_partition_read_write() {
1554        let executor = tokio::Runner::default();
1555        test_many_partition_read_write(executor);
1556    }
1557
1558    #[test]
1559    fn test_tokio_blob_read_past_length() {
1560        let executor = tokio::Runner::default();
1561        test_blob_read_past_length(executor);
1562    }
1563
1564    #[test]
1565    fn test_tokio_blob_clone_and_concurrent_read() {
1566        // Run test
1567        let executor = tokio::Runner::default();
1568        test_blob_clone_and_concurrent_read(executor);
1569    }
1570
1571    #[test]
1572    fn test_tokio_shutdown() {
1573        let executor = tokio::Runner::default();
1574        test_shutdown(executor);
1575    }
1576
1577    #[test]
1578    fn test_tokio_shutdown_multiple_signals() {
1579        let executor = tokio::Runner::default();
1580        test_shutdown_multiple_signals(executor);
1581    }
1582
1583    #[test]
1584    fn test_tokio_shutdown_timeout() {
1585        let executor = tokio::Runner::default();
1586        test_shutdown_timeout(executor);
1587    }
1588
1589    #[test]
1590    fn test_tokio_shutdown_multiple_stop_calls() {
1591        let executor = tokio::Runner::default();
1592        test_shutdown_multiple_stop_calls(executor);
1593    }
1594
1595    #[test]
1596    fn test_tokio_spawn_ref() {
1597        let executor = tokio::Runner::default();
1598        test_spawn_ref(executor);
1599    }
1600
1601    #[test]
1602    #[should_panic]
1603    fn test_tokio_spawn_ref_duplicate() {
1604        let executor = tokio::Runner::default();
1605        test_spawn_ref_duplicate(executor);
1606    }
1607
1608    #[test]
1609    #[should_panic]
1610    fn test_tokio_spawn_duplicate() {
1611        let executor = tokio::Runner::default();
1612        test_spawn_duplicate(executor);
1613    }
1614
1615    #[test]
1616    fn test_tokio_spawn_blocking() {
1617        for dedicated in [false, true] {
1618            let executor = tokio::Runner::default();
1619            test_spawn_blocking(executor, dedicated);
1620        }
1621    }
1622
1623    #[test]
1624    fn test_tokio_spawn_blocking_panic() {
1625        for dedicated in [false, true] {
1626            let executor = tokio::Runner::default();
1627            executor.start(|context| async move {
1628                let handle = context.spawn_blocking(dedicated, |_| {
1629                    panic!("blocking task panicked");
1630                });
1631                let result = handle.await;
1632                assert!(matches!(result, Err(Error::Exited)));
1633            });
1634        }
1635    }
1636
1637    #[test]
1638    fn test_tokio_spawn_blocking_abort() {
1639        for dedicated in [false, true] {
1640            let executor = tokio::Runner::default();
1641            test_spawn_blocking_abort(executor, dedicated);
1642        }
1643    }
1644
1645    #[test]
1646    fn test_tokio_spawn_blocking_ref() {
1647        for dedicated in [false, true] {
1648            let executor = tokio::Runner::default();
1649            test_spawn_blocking_ref(executor, dedicated);
1650        }
1651    }
1652
1653    #[test]
1654    #[should_panic]
1655    fn test_tokio_spawn_blocking_ref_duplicate() {
1656        for dedicated in [false, true] {
1657            let executor = tokio::Runner::default();
1658            test_spawn_blocking_ref_duplicate(executor, dedicated);
1659        }
1660    }
1661
1662    #[test]
1663    fn test_tokio_metrics() {
1664        let executor = tokio::Runner::default();
1665        test_metrics(executor);
1666    }
1667
1668    #[test]
1669    #[should_panic]
1670    fn test_tokio_metrics_label() {
1671        let executor = tokio::Runner::default();
1672        test_metrics_label(executor);
1673    }
1674
1675    #[test]
1676    fn test_tokio_telemetry() {
1677        let executor = tokio::Runner::default();
1678        executor.start(|context| async move {
1679            // Define the server address
1680            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1681
1682            // Configure telemetry
1683            tokio::telemetry::init(
1684                context.with_label("metrics"),
1685                tokio::telemetry::Logging {
1686                    level: Level::INFO,
1687                    json: false,
1688                },
1689                Some(address),
1690                None,
1691            );
1692
1693            // Register a test metric
1694            let counter: Counter<u64> = Counter::default();
1695            context.register("test_counter", "Test counter", counter.clone());
1696            counter.inc();
1697
1698            // Helper functions to parse HTTP response
1699            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1700                let mut line = Vec::new();
1701                loop {
1702                    let byte = stream.recv(vec![0; 1]).await?;
1703                    if byte[0] == b'\n' {
1704                        if line.last() == Some(&b'\r') {
1705                            line.pop(); // Remove trailing \r
1706                        }
1707                        break;
1708                    }
1709                    line.push(byte[0]);
1710                }
1711                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1712            }
1713
1714            async fn read_headers<St: Stream>(
1715                stream: &mut St,
1716            ) -> Result<HashMap<String, String>, Error> {
1717                let mut headers = HashMap::new();
1718                loop {
1719                    let line = read_line(stream).await?;
1720                    if line.is_empty() {
1721                        break;
1722                    }
1723                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1724                    if parts.len() == 2 {
1725                        headers.insert(parts[0].to_string(), parts[1].to_string());
1726                    }
1727                }
1728                Ok(headers)
1729            }
1730
1731            async fn read_body<St: Stream>(
1732                stream: &mut St,
1733                content_length: usize,
1734            ) -> Result<String, Error> {
1735                let read = stream.recv(vec![0; content_length]).await?;
1736                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1737            }
1738
1739            // Simulate a client connecting to the server
1740            let client_handle = context
1741                .with_label("client")
1742                .spawn(move |context| async move {
1743                    let (mut sink, mut stream) = loop {
1744                        match context.dial(address).await {
1745                            Ok((sink, stream)) => break (sink, stream),
1746                            Err(e) => {
1747                                // The client may be polled before the server is ready, that's alright!
1748                                error!(err =?e, "failed to connect");
1749                                context.sleep(Duration::from_millis(10)).await;
1750                            }
1751                        }
1752                    };
1753
1754                    // Send a GET request to the server
1755                    let request = format!(
1756                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1757                    );
1758                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
1759
1760                    // Read and verify the HTTP status line
1761                    let status_line = read_line(&mut stream).await.unwrap();
1762                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1763
1764                    // Read and parse headers
1765                    let headers = read_headers(&mut stream).await.unwrap();
1766                    println!("Headers: {headers:?}");
1767                    let content_length = headers
1768                        .get("content-length")
1769                        .unwrap()
1770                        .parse::<usize>()
1771                        .unwrap();
1772
1773                    // Read and verify the body
1774                    let body = read_body(&mut stream, content_length).await.unwrap();
1775                    assert!(body.contains("test_counter_total 1"));
1776                });
1777
1778            // Wait for the client task to complete
1779            client_handle.await.unwrap();
1780        });
1781    }
1782}