commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23    future::Future,
24    io::Error as IoError,
25    net::SocketAddr,
26    time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36    if #[cfg(not(target_arch = "wasm32"))] {
37        pub mod tokio;
38        pub mod benchmarks;
39    }
40}
41mod network;
42mod process;
43mod storage;
44pub mod telemetry;
45mod utils;
46pub use utils::*;
47#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
48mod iouring;
49
50/// Prefix for runtime metrics.
51const METRICS_PREFIX: &str = "runtime";
52
53/// Errors that can occur when interacting with the runtime.
54#[derive(Error, Debug)]
55pub enum Error {
56    #[error("exited")]
57    Exited,
58    #[error("closed")]
59    Closed,
60    #[error("timeout")]
61    Timeout,
62    #[error("bind failed")]
63    BindFailed,
64    #[error("connection failed")]
65    ConnectionFailed,
66    #[error("write failed")]
67    WriteFailed,
68    #[error("read failed")]
69    ReadFailed,
70    #[error("send failed")]
71    SendFailed,
72    #[error("recv failed")]
73    RecvFailed,
74    #[error("partition creation failed: {0}")]
75    PartitionCreationFailed(String),
76    #[error("partition missing: {0}")]
77    PartitionMissing(String),
78    #[error("partition corrupt: {0}")]
79    PartitionCorrupt(String),
80    #[error("blob open failed: {0}/{1} error: {2}")]
81    BlobOpenFailed(String, String, IoError),
82    #[error("blob missing: {0}/{1}")]
83    BlobMissing(String, String),
84    #[error("blob resize failed: {0}/{1} error: {2}")]
85    BlobResizeFailed(String, String, IoError),
86    #[error("blob sync failed: {0}/{1} error: {2}")]
87    BlobSyncFailed(String, String, IoError),
88    #[error("blob insufficient length")]
89    BlobInsufficientLength,
90    #[error("offset overflow")]
91    OffsetOverflow,
92    #[error("io error: {0}")]
93    Io(#[from] IoError),
94}
95
96/// Interface that any task scheduler must implement to start
97/// running tasks.
98pub trait Runner {
99    /// Context defines the environment available to tasks.
100    /// Example of possible services provided by the context include:
101    /// - [Clock] for time-based operations
102    /// - [Network] for network operations
103    /// - [Storage] for storage operations
104    type Context;
105
106    /// Start running a root task.
107    fn start<F, Fut>(self, f: F) -> Fut::Output
108    where
109        F: FnOnce(Self::Context) -> Fut,
110        Fut: Future;
111}
112
113/// Interface that any task scheduler must implement to spawn tasks.
114pub trait Spawner: Clone + Send + Sync + 'static {
115    /// Enqueue a task to be executed.
116    ///
117    /// Unlike a future, a spawned task will start executing immediately (even if the caller
118    /// does not await the handle).
119    ///
120    /// Spawned tasks consume the context used to create them. This ensures that context cannot
121    /// be shared between tasks and that a task's context always comes from somewhere.
122    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
123    where
124        F: FnOnce(Self) -> Fut + Send + 'static,
125        Fut: Future<Output = T> + Send + 'static,
126        T: Send + 'static;
127
128    /// Enqueue a task to be executed (without consuming the context).
129    ///
130    /// The semantics are the same as [Spawner::spawn].
131    ///
132    /// # Warning
133    ///
134    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
135    /// to prevent accidental misuse.
136    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
137    where
138        F: Future<Output = T> + Send + 'static,
139        T: Send + 'static;
140
141    /// Enqueue a blocking task to be executed.
142    ///
143    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
144    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
145    /// thread (tasks that are expected to run indefinitely).
146    ///
147    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
148    /// result.
149    ///
150    /// # Motivation
151    ///
152    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
153    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
154    /// of said runtimes.
155    ///
156    /// # Warning
157    ///
158    /// Blocking tasks cannot be aborted.
159    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
160    where
161        F: FnOnce(Self) -> T + Send + 'static,
162        T: Send + 'static;
163
164    /// Enqueue a blocking task to be executed (without consuming the context).
165    ///
166    /// The semantics are the same as [Spawner::spawn_blocking].
167    ///
168    /// # Warning
169    ///
170    /// If this function is used to spawn multiple tasks from the same context,
171    /// the runtime will panic to prevent accidental misuse.
172    fn spawn_blocking_ref<F, T>(
173        &mut self,
174        dedicated: bool,
175    ) -> impl FnOnce(F) -> Handle<T> + 'static
176    where
177        F: FnOnce() -> T + Send + 'static,
178        T: Send + 'static;
179
180    /// Signals the runtime to stop execution and waits for all outstanding tasks
181    /// to perform any required cleanup and exit.
182    ///
183    /// This method does not actually kill any tasks but rather signals to them, using
184    /// the [signal::Signal] returned by [Spawner::stopped], that they should exit.
185    /// It then waits for all [signal::Signal] references to be dropped before returning.
186    ///
187    /// ## Multiple Stop Calls
188    ///
189    /// This method is idempotent and safe to call multiple times concurrently (on
190    /// different instances of the same context since it consumes `self`). The first
191    /// call initiates shutdown with the provided `value`, and all subsequent calls
192    /// will wait for the same completion regardless of their `value` parameter, i.e.
193    /// the original `value` from the first call is preserved.
194    ///
195    /// ## Timeout
196    ///
197    /// If a timeout is provided, the method will return an error if all [signal::Signal]
198    /// references have not been dropped within the specified duration.
199    fn stop(
200        self,
201        value: i32,
202        timeout: Option<Duration>,
203    ) -> impl Future<Output = Result<(), Error>> + Send;
204
205    /// Returns an instance of a [signal::Signal] that resolves when [Spawner::stop] is called by
206    /// any task.
207    ///
208    /// If [Spawner::stop] has already been called, the [signal::Signal] returned will resolve
209    /// immediately. The [signal::Signal] returned will always resolve to the value of the
210    /// first [Spawner::stop] call.
211    fn stopped(&self) -> signal::Signal;
212}
213
214/// Interface to register and encode metrics.
215pub trait Metrics: Clone + Send + Sync + 'static {
216    /// Get the current label of the context.
217    fn label(&self) -> String;
218
219    /// Create a new instance of `Metrics` with the given label appended to the end
220    /// of the current `Metrics` label.
221    ///
222    /// This is commonly used to create a nested context for `register`.
223    ///
224    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
225    /// label (reserved for metrics for the runtime).
226    fn with_label(&self, label: &str) -> Self;
227
228    /// Prefix the given label with the current context's label.
229    ///
230    /// Unlike `with_label`, this method does not create a new context.
231    fn scoped_label(&self, label: &str) -> String {
232        let label = if self.label().is_empty() {
233            label.to_string()
234        } else {
235            format!("{}_{}", self.label(), label)
236        };
237        assert!(
238            !label.starts_with(METRICS_PREFIX),
239            "using runtime label is not allowed"
240        );
241        label
242    }
243
244    /// Register a metric with the runtime.
245    ///
246    /// Any registered metric will include (as a prefix) the label of the current context.
247    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
248
249    /// Encode all metrics into a buffer.
250    fn encode(&self) -> String;
251}
252
253/// Interface that any task scheduler must implement to provide
254/// time-based operations.
255///
256/// It is necessary to mock time to provide deterministic execution
257/// of arbitrary tasks.
258pub trait Clock: Clone + Send + Sync + 'static {
259    /// Returns the current time.
260    fn current(&self) -> SystemTime;
261
262    /// Sleep for the given duration.
263    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
264
265    /// Sleep until the given deadline.
266    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
267}
268
269/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
270pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
271
272/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
273pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
274
275/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
276pub type ListenerOf<N> = <N as crate::Network>::Listener;
277
278/// Interface that any runtime must implement to create
279/// network connections.
280pub trait Network: Clone + Send + Sync + 'static {
281    /// The type of [Listener] that's returned when binding to a socket.
282    /// Accepting a connection returns a [Sink] and [Stream] which are defined
283    /// by the [Listener] and used to send and receive data over the connection.
284    type Listener: Listener;
285
286    /// Bind to the given socket address.
287    fn bind(
288        &self,
289        socket: SocketAddr,
290    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
291
292    /// Dial the given socket address.
293    fn dial(
294        &self,
295        socket: SocketAddr,
296    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
297}
298
299/// Interface that any runtime must implement to handle
300/// incoming network connections.
301pub trait Listener: Sync + Send + 'static {
302    /// The type of [Sink] that's returned when accepting a connection.
303    /// This is used to send data to the remote connection.
304    type Sink: Sink;
305    /// The type of [Stream] that's returned when accepting a connection.
306    /// This is used to receive data from the remote connection.
307    type Stream: Stream;
308
309    /// Accept an incoming connection.
310    fn accept(
311        &mut self,
312    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
313
314    /// Returns the local address of the listener.
315    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
316}
317
318/// Interface that any runtime must implement to send
319/// messages over a network connection.
320pub trait Sink: Sync + Send + 'static {
321    /// Send a message to the sink.
322    fn send(
323        &mut self,
324        msg: impl Into<StableBuf> + Send,
325    ) -> impl Future<Output = Result<(), Error>> + Send;
326}
327
328/// Interface that any runtime must implement to receive
329/// messages over a network connection.
330pub trait Stream: Sync + Send + 'static {
331    /// Receive a message from the stream, storing it in the given buffer.
332    /// Reads exactly the number of bytes that fit in the buffer.
333    fn recv(
334        &mut self,
335        buf: impl Into<StableBuf> + Send,
336    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
337}
338
339/// Interface to interact with storage.
340///
341///
342/// To support storage implementations that enable concurrent reads and
343/// writes, blobs are responsible for maintaining synchronization.
344///
345/// Storage can be backed by a local filesystem, cloud storage, etc.
346pub trait Storage: Clone + Send + Sync + 'static {
347    /// The readable/writeable storage buffer that can be opened by this Storage.
348    type Blob: Blob;
349
350    /// Open an existing blob in a given partition or create a new one, returning
351    /// the blob and its length.
352    ///
353    /// Multiple instances of the same blob can be opened concurrently, however,
354    /// writing to the same blob concurrently may lead to undefined behavior.
355    fn open(
356        &self,
357        partition: &str,
358        name: &[u8],
359    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
360
361    /// Remove a blob from a given partition.
362    ///
363    /// If no `name` is provided, the entire partition is removed.
364    fn remove(
365        &self,
366        partition: &str,
367        name: Option<&[u8]>,
368    ) -> impl Future<Output = Result<(), Error>> + Send;
369
370    /// Return all blobs in a given partition.
371    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
372}
373
374/// Interface to read and write to a blob.
375///
376/// To support blob implementations that enable concurrent reads and
377/// writes, blobs are responsible for maintaining synchronization.
378///
379/// Cloning a blob is similar to wrapping a single file descriptor in
380/// a lock whereas opening a new blob (of the same name) is similar to
381/// opening a new file descriptor. If multiple blobs are opened with the same
382/// name, they are not expected to coordinate access to underlying storage
383/// and writing to both is undefined behavior.
384///
385/// When a blob is dropped, any unsynced changes may be discarded. Implementations
386/// may attempt to sync during drop but errors will go unhandled. Call `sync`
387/// before dropping to ensure all changes are durably persisted.
388#[allow(clippy::len_without_is_empty)]
389pub trait Blob: Clone + Send + Sync + 'static {
390    /// Read from the blob at the given offset.
391    ///
392    /// `read_at` does not return the number of bytes read because it
393    /// only returns once the entire buffer has been filled.
394    fn read_at(
395        &self,
396        buf: impl Into<StableBuf> + Send,
397        offset: u64,
398    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
399
400    /// Write `buf` to the blob at the given offset.
401    fn write_at(
402        &self,
403        buf: impl Into<StableBuf> + Send,
404        offset: u64,
405    ) -> impl Future<Output = Result<(), Error>> + Send;
406
407    /// Resize the blob to the given length.
408    ///
409    /// If the length is greater than the current length, the blob is extended with zeros.
410    /// If the length is less than the current length, the blob is resized.
411    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
412
413    /// Ensure all pending data is durably persisted.
414    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use bytes::Bytes;
421    use commonware_macros::select;
422    use futures::{
423        channel::{mpsc, oneshot},
424        future::{pending, ready},
425        join, pin_mut, FutureExt, SinkExt, StreamExt,
426    };
427    use prometheus_client::metrics::counter::Counter;
428    use std::{
429        collections::HashMap,
430        panic::{catch_unwind, AssertUnwindSafe},
431        str::FromStr,
432        sync::{
433            atomic::{AtomicU32, Ordering},
434            Arc, Mutex,
435        },
436    };
437    use tracing::{error, Level};
438    use utils::reschedule;
439
440    fn test_error_future<R: Runner>(runner: R) {
441        async fn error_future() -> Result<&'static str, &'static str> {
442            Err("An error occurred")
443        }
444        let result = runner.start(|_| error_future());
445        assert_eq!(result, Err("An error occurred"));
446    }
447
448    fn test_clock_sleep<R: Runner>(runner: R)
449    where
450        R::Context: Spawner + Clock,
451    {
452        runner.start(|context| async move {
453            // Capture initial time
454            let start = context.current();
455            let sleep_duration = Duration::from_millis(10);
456            context.sleep(sleep_duration).await;
457
458            // After run, time should have advanced
459            let end = context.current();
460            assert!(end.duration_since(start).unwrap() >= sleep_duration);
461        });
462    }
463
464    fn test_clock_sleep_until<R: Runner>(runner: R)
465    where
466        R::Context: Spawner + Clock,
467    {
468        runner.start(|context| async move {
469            // Trigger sleep
470            let now = context.current();
471            context.sleep_until(now + Duration::from_millis(100)).await;
472
473            // Ensure slept duration has elapsed
474            let elapsed = now.elapsed().unwrap();
475            assert!(elapsed >= Duration::from_millis(100));
476        });
477    }
478
479    fn test_root_finishes<R: Runner>(runner: R)
480    where
481        R::Context: Spawner,
482    {
483        runner.start(|context| async move {
484            context.spawn(|_| async move {
485                loop {
486                    reschedule().await;
487                }
488            });
489        });
490    }
491
492    fn test_spawn_abort<R: Runner>(runner: R)
493    where
494        R::Context: Spawner,
495    {
496        runner.start(|context| async move {
497            let handle = context.spawn(|_| async move {
498                loop {
499                    reschedule().await;
500                }
501            });
502            handle.abort();
503            assert!(matches!(handle.await, Err(Error::Closed)));
504        });
505    }
506
507    fn test_panic_aborts_root<R: Runner>(runner: R) {
508        let result = catch_unwind(AssertUnwindSafe(|| {
509            runner.start(|_| async move {
510                panic!("blah");
511            });
512        }));
513        result.unwrap_err();
514    }
515
516    fn test_panic_aborts_spawn<R: Runner>(runner: R)
517    where
518        R::Context: Spawner,
519    {
520        let result = runner.start(|context| async move {
521            let result = context.spawn(|_| async move {
522                panic!("blah");
523            });
524            assert!(matches!(result.await, Err(Error::Exited)));
525            Result::<(), Error>::Ok(())
526        });
527
528        // Ensure panic was caught
529        result.unwrap();
530    }
531
532    fn test_select<R: Runner>(runner: R) {
533        runner.start(|_| async move {
534            // Test first branch
535            let output = Mutex::new(0);
536            select! {
537                v1 = ready(1) => {
538                    *output.lock().unwrap() = v1;
539                },
540                v2 = ready(2) => {
541                    *output.lock().unwrap() = v2;
542                },
543            };
544            assert_eq!(*output.lock().unwrap(), 1);
545
546            // Test second branch
547            select! {
548                v1 = std::future::pending::<i32>() => {
549                    *output.lock().unwrap() = v1;
550                },
551                v2 = ready(2) => {
552                    *output.lock().unwrap() = v2;
553                },
554            };
555            assert_eq!(*output.lock().unwrap(), 2);
556        });
557    }
558
559    /// Ensure future fusing works as expected.
560    fn test_select_loop<R: Runner>(runner: R)
561    where
562        R::Context: Clock,
563    {
564        runner.start(|context| async move {
565            // Should hit timeout
566            let (mut sender, mut receiver) = mpsc::unbounded();
567            for _ in 0..2 {
568                select! {
569                    v = receiver.next() => {
570                        panic!("unexpected value: {v:?}");
571                    },
572                    _ = context.sleep(Duration::from_millis(100)) => {
573                        continue;
574                    },
575                };
576            }
577
578            // Populate channel
579            sender.send(0).await.unwrap();
580            sender.send(1).await.unwrap();
581
582            // Prefer not reading channel without losing messages
583            select! {
584                _ = async {} => {
585                    // Skip reading from channel even though populated
586                },
587                v = receiver.next() => {
588                    panic!("unexpected value: {v:?}");
589                },
590            };
591
592            // Process messages
593            for i in 0..2 {
594                select! {
595                    _ = context.sleep(Duration::from_millis(100)) => {
596                        panic!("timeout");
597                    },
598                    v = receiver.next() => {
599                        assert_eq!(v.unwrap(), i);
600                    },
601                };
602            }
603        });
604    }
605
606    fn test_storage_operations<R: Runner>(runner: R)
607    where
608        R::Context: Storage,
609    {
610        runner.start(|context| async move {
611            let partition = "test_partition";
612            let name = b"test_blob";
613
614            // Open a new blob
615            let (blob, _) = context
616                .open(partition, name)
617                .await
618                .expect("Failed to open blob");
619
620            // Write data to the blob
621            let data = b"Hello, Storage!";
622            blob.write_at(Vec::from(data), 0)
623                .await
624                .expect("Failed to write to blob");
625
626            // Sync the blob
627            blob.sync().await.expect("Failed to sync blob");
628
629            // Read data from the blob
630            let read = blob
631                .read_at(vec![0; data.len()], 0)
632                .await
633                .expect("Failed to read from blob");
634            assert_eq!(read.as_ref(), data);
635
636            // Sync the blob
637            blob.sync().await.expect("Failed to sync blob");
638
639            // Scan blobs in the partition
640            let blobs = context
641                .scan(partition)
642                .await
643                .expect("Failed to scan partition");
644            assert!(blobs.contains(&name.to_vec()));
645
646            // Reopen the blob
647            let (blob, len) = context
648                .open(partition, name)
649                .await
650                .expect("Failed to reopen blob");
651            assert_eq!(len, data.len() as u64);
652
653            // Read data part of message back
654            let read = blob
655                .read_at(vec![0u8; 7], 7)
656                .await
657                .expect("Failed to read data");
658            assert_eq!(read.as_ref(), b"Storage");
659
660            // Sync the blob
661            blob.sync().await.expect("Failed to sync blob");
662
663            // Remove the blob
664            context
665                .remove(partition, Some(name))
666                .await
667                .expect("Failed to remove blob");
668
669            // Ensure the blob is removed
670            let blobs = context
671                .scan(partition)
672                .await
673                .expect("Failed to scan partition");
674            assert!(!blobs.contains(&name.to_vec()));
675
676            // Remove the partition
677            context
678                .remove(partition, None)
679                .await
680                .expect("Failed to remove partition");
681
682            // Scan the partition
683            let result = context.scan(partition).await;
684            assert!(matches!(result, Err(Error::PartitionMissing(_))));
685        });
686    }
687
688    fn test_blob_read_write<R: Runner>(runner: R)
689    where
690        R::Context: Storage,
691    {
692        runner.start(|context| async move {
693            let partition = "test_partition";
694            let name = b"test_blob_rw";
695
696            // Open a new blob
697            let (blob, _) = context
698                .open(partition, name)
699                .await
700                .expect("Failed to open blob");
701
702            // Write data at different offsets
703            let data1 = b"Hello";
704            let data2 = b"World";
705            blob.write_at(Vec::from(data1), 0)
706                .await
707                .expect("Failed to write data1");
708            blob.write_at(Vec::from(data2), 5)
709                .await
710                .expect("Failed to write data2");
711
712            // Read data back
713            let read = blob
714                .read_at(vec![0u8; 10], 0)
715                .await
716                .expect("Failed to read data");
717            assert_eq!(&read.as_ref()[..5], data1);
718            assert_eq!(&read.as_ref()[5..], data2);
719
720            // Read past end of blob
721            let result = blob.read_at(vec![0u8; 10], 10).await;
722            assert!(result.is_err());
723
724            // Rewrite data without affecting length
725            let data3 = b"Store";
726            blob.write_at(Vec::from(data3), 5)
727                .await
728                .expect("Failed to write data3");
729
730            // Read data back
731            let read = blob
732                .read_at(vec![0u8; 10], 0)
733                .await
734                .expect("Failed to read data");
735            assert_eq!(&read.as_ref()[..5], data1);
736            assert_eq!(&read.as_ref()[5..], data3);
737
738            // Read past end of blob
739            let result = blob.read_at(vec![0u8; 10], 10).await;
740            assert!(result.is_err());
741        });
742    }
743
744    fn test_blob_resize<R: Runner>(runner: R)
745    where
746        R::Context: Storage,
747    {
748        runner.start(|context| async move {
749            let partition = "test_partition_resize";
750            let name = b"test_blob_resize";
751
752            // Open and write to a new blob
753            let (blob, _) = context
754                .open(partition, name)
755                .await
756                .expect("Failed to open blob");
757
758            let data = b"some data";
759            blob.write_at(data.to_vec(), 0)
760                .await
761                .expect("Failed to write");
762            blob.sync().await.expect("Failed to sync after write");
763
764            // Re-open and check length
765            let (blob, len) = context.open(partition, name).await.unwrap();
766            assert_eq!(len, data.len() as u64);
767
768            // Resize to extend the file
769            let new_len = (data.len() as u64) * 2;
770            blob.resize(new_len)
771                .await
772                .expect("Failed to resize to extend");
773            blob.sync().await.expect("Failed to sync after resize");
774
775            // Re-open and check length again
776            let (blob, len) = context.open(partition, name).await.unwrap();
777            assert_eq!(len, new_len);
778
779            // Read original data
780            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
781            assert_eq!(read_buf.as_ref(), data);
782
783            // Read extended part (should be zeros)
784            let extended_part = blob
785                .read_at(vec![0; data.len()], data.len() as u64)
786                .await
787                .unwrap();
788            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
789
790            // Truncate the blob
791            blob.resize(data.len() as u64).await.unwrap();
792            blob.sync().await.unwrap();
793
794            // Reopen to check truncation
795            let (blob, size) = context.open(partition, name).await.unwrap();
796            assert_eq!(size, data.len() as u64);
797
798            // Read truncated data
799            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
800            assert_eq!(read_buf.as_ref(), data);
801            blob.sync().await.unwrap();
802        });
803    }
804
805    fn test_many_partition_read_write<R: Runner>(runner: R)
806    where
807        R::Context: Storage,
808    {
809        runner.start(|context| async move {
810            let partitions = ["partition1", "partition2", "partition3"];
811            let name = b"test_blob_rw";
812            let data1 = b"Hello";
813            let data2 = b"World";
814
815            for (additional, partition) in partitions.iter().enumerate() {
816                // Open a new blob
817                let (blob, _) = context
818                    .open(partition, name)
819                    .await
820                    .expect("Failed to open blob");
821
822                // Write data at different offsets
823                blob.write_at(Vec::from(data1), 0)
824                    .await
825                    .expect("Failed to write data1");
826                blob.write_at(Vec::from(data2), 5 + additional as u64)
827                    .await
828                    .expect("Failed to write data2");
829
830                // Sync the blob
831                blob.sync().await.expect("Failed to sync blob");
832            }
833
834            for (additional, partition) in partitions.iter().enumerate() {
835                // Open a new blob
836                let (blob, len) = context
837                    .open(partition, name)
838                    .await
839                    .expect("Failed to open blob");
840                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
841
842                // Read data back
843                let read = blob
844                    .read_at(vec![0u8; 10 + additional], 0)
845                    .await
846                    .expect("Failed to read data");
847                assert_eq!(&read.as_ref()[..5], b"Hello");
848                assert_eq!(&read.as_ref()[5 + additional..], b"World");
849            }
850        });
851    }
852
853    fn test_blob_read_past_length<R: Runner>(runner: R)
854    where
855        R::Context: Storage,
856    {
857        runner.start(|context| async move {
858            let partition = "test_partition";
859            let name = b"test_blob_rw";
860
861            // Open a new blob
862            let (blob, _) = context
863                .open(partition, name)
864                .await
865                .expect("Failed to open blob");
866
867            // Read data past file length (empty file)
868            let result = blob.read_at(vec![0u8; 10], 0).await;
869            assert!(result.is_err());
870
871            // Write data to the blob
872            let data = b"Hello, Storage!".to_vec();
873            blob.write_at(data, 0)
874                .await
875                .expect("Failed to write to blob");
876
877            // Read data past file length (non-empty file)
878            let result = blob.read_at(vec![0u8; 20], 0).await;
879            assert!(result.is_err());
880        })
881    }
882
883    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
884    where
885        R::Context: Spawner + Storage + Metrics,
886    {
887        runner.start(|context| async move {
888            let partition = "test_partition";
889            let name = b"test_blob_rw";
890
891            // Open a new blob
892            let (blob, _) = context
893                .open(partition, name)
894                .await
895                .expect("Failed to open blob");
896
897            // Write data to the blob
898            let data = b"Hello, Storage!";
899            blob.write_at(Vec::from(data), 0)
900                .await
901                .expect("Failed to write to blob");
902
903            // Sync the blob
904            blob.sync().await.expect("Failed to sync blob");
905
906            // Read data from the blob in clone
907            let check1 = context.with_label("check1").spawn({
908                let blob = blob.clone();
909                move |_| async move {
910                    let read = blob
911                        .read_at(vec![0u8; data.len()], 0)
912                        .await
913                        .expect("Failed to read from blob");
914                    assert_eq!(read.as_ref(), data);
915                }
916            });
917            let check2 = context.with_label("check2").spawn({
918                let blob = blob.clone();
919                move |_| async move {
920                    let read = blob
921                        .read_at(vec![0; data.len()], 0)
922                        .await
923                        .expect("Failed to read from blob");
924                    assert_eq!(read.as_ref(), data);
925                }
926            });
927
928            // Wait for both reads to complete
929            let result = join!(check1, check2);
930            assert!(result.0.is_ok());
931            assert!(result.1.is_ok());
932
933            // Read data from the blob
934            let read = blob
935                .read_at(vec![0; data.len()], 0)
936                .await
937                .expect("Failed to read from blob");
938            assert_eq!(read.as_ref(), data);
939
940            // Drop the blob
941            drop(blob);
942
943            // Ensure no blobs still open
944            let buffer = context.encode();
945            assert!(buffer.contains("open_blobs 0"));
946        });
947    }
948
949    fn test_shutdown<R: Runner>(runner: R)
950    where
951        R::Context: Spawner + Metrics + Clock,
952    {
953        let kill = 9;
954        runner.start(|context| async move {
955            // Spawn a task that waits for signal
956            let before = context
957                .with_label("before")
958                .spawn(move |context| async move {
959                    let mut signal = context.stopped();
960                    let value = (&mut signal).await.unwrap();
961                    assert_eq!(value, kill);
962                    drop(signal);
963                });
964
965            // Signal the tasks and wait for them to stop
966            let result = context.clone().stop(kill, None).await;
967            assert!(result.is_ok());
968
969            // Spawn a task after stop is called
970            let after = context
971                .with_label("after")
972                .spawn(move |context| async move {
973                    // A call to `stopped()` after `stop()` resolves immediately
974                    let value = context.stopped().await.unwrap();
975                    assert_eq!(value, kill);
976                });
977
978            // Ensure both tasks complete
979            let result = join!(before, after);
980            assert!(result.0.is_ok());
981            assert!(result.1.is_ok());
982        });
983    }
984
985    fn test_shutdown_multiple_signals<R: Runner>(runner: R)
986    where
987        R::Context: Spawner + Metrics + Clock,
988    {
989        let kill = 42;
990        runner.start(|context| async move {
991            let (started_tx, mut started_rx) = mpsc::channel(3);
992            let counter = Arc::new(AtomicU32::new(0));
993
994            // Spawn 3 tasks that do cleanup work after receiving stop signal
995            // and increment a shared counter
996            let task = |cleanup_duration: Duration| {
997                let context = context.clone();
998                let counter = counter.clone();
999                let mut started_tx = started_tx.clone();
1000                context.spawn(move |context| async move {
1001                    // Wait for signal to be acquired
1002                    let mut signal = context.stopped();
1003                    started_tx.send(()).await.unwrap();
1004
1005                    // Increment once killed
1006                    let value = (&mut signal).await.unwrap();
1007                    assert_eq!(value, kill);
1008                    context.sleep(cleanup_duration).await;
1009                    counter.fetch_add(1, Ordering::SeqCst);
1010
1011                    // Wait to drop signal until work has been done
1012                    drop(signal);
1013                })
1014            };
1015
1016            let task1 = task(Duration::from_millis(10));
1017            let task2 = task(Duration::from_millis(20));
1018            let task3 = task(Duration::from_millis(30));
1019
1020            // Give tasks time to start
1021            for _ in 0..3 {
1022                started_rx.next().await.unwrap();
1023            }
1024
1025            // Stop and verify all cleanup completed
1026            context.stop(kill, None).await.unwrap();
1027            assert_eq!(counter.load(Ordering::SeqCst), 3);
1028
1029            // Ensure all tasks completed
1030            let result = join!(task1, task2, task3);
1031            assert!(result.0.is_ok());
1032            assert!(result.1.is_ok());
1033            assert!(result.2.is_ok());
1034        });
1035    }
1036
1037    fn test_shutdown_timeout<R: Runner>(runner: R)
1038    where
1039        R::Context: Spawner + Metrics + Clock,
1040    {
1041        let kill = 42;
1042        runner.start(|context| async move {
1043            // Setup startup coordinator
1044            let (started_tx, started_rx) = oneshot::channel();
1045
1046            // Spawn a task that never completes its cleanup
1047            context.clone().spawn(move |context| async move {
1048                let signal = context.stopped();
1049                started_tx.send(()).unwrap();
1050                pending::<()>().await;
1051                signal.await.unwrap();
1052            });
1053
1054            // Try to stop with a timeout
1055            started_rx.await.unwrap();
1056            let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1057
1058            // Assert that we got a timeout error
1059            assert!(matches!(result, Err(Error::Timeout)));
1060        });
1061    }
1062
1063    fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1064    where
1065        R::Context: Spawner + Metrics + Clock,
1066    {
1067        let kill1 = 42;
1068        let kill2 = 43;
1069
1070        runner.start(|context| async move {
1071            let (started_tx, started_rx) = oneshot::channel();
1072            let counter = Arc::new(AtomicU32::new(0));
1073
1074            // Spawn a task that delays completion to test timing
1075            let task = context.with_label("blocking_task").spawn({
1076                let counter = counter.clone();
1077                move |context| async move {
1078                    // Wait for signal to be acquired
1079                    let mut signal = context.stopped();
1080                    started_tx.send(()).unwrap();
1081
1082                    // Wait for signal to be resolved
1083                    let value = (&mut signal).await.unwrap();
1084                    assert_eq!(value, kill1);
1085                    context.sleep(Duration::from_millis(50)).await;
1086
1087                    // Increment counter
1088                    counter.fetch_add(1, Ordering::SeqCst);
1089                    drop(signal);
1090                }
1091            });
1092
1093            // Give task time to start
1094            started_rx.await.unwrap();
1095
1096            // Issue two separate stop calls
1097            // The second stop call uses a different stop value that should be ignored
1098            let stop_task1 = context.clone().stop(kill1, None);
1099            pin_mut!(stop_task1);
1100            let stop_task2 = context.clone().stop(kill2, None);
1101            pin_mut!(stop_task2);
1102
1103            // Both of them should be awaiting completion
1104            assert!(stop_task1.as_mut().now_or_never().is_none());
1105            assert!(stop_task2.as_mut().now_or_never().is_none());
1106
1107            // Wait for both stop calls to complete
1108            assert!(stop_task1.await.is_ok());
1109            assert!(stop_task2.await.is_ok());
1110
1111            // Verify first stop value wins
1112            let sig = context.stopped().await;
1113            assert_eq!(sig.unwrap(), kill1);
1114
1115            // Wait for blocking task to complete
1116            let result = task.await;
1117            assert!(result.is_ok());
1118            assert_eq!(counter.load(Ordering::SeqCst), 1);
1119
1120            // Post-completion stop should return immediately
1121            assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1122        });
1123    }
1124
1125    fn test_spawn_ref<R: Runner>(runner: R)
1126    where
1127        R::Context: Spawner,
1128    {
1129        runner.start(|mut context| async move {
1130            let handle = context.spawn_ref();
1131            let result = handle(async move { 42 }).await;
1132            assert!(matches!(result, Ok(42)));
1133        });
1134    }
1135
1136    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1137    where
1138        R::Context: Spawner,
1139    {
1140        runner.start(|mut context| async move {
1141            let handle = context.spawn_ref();
1142            let result = handle(async move { 42 }).await;
1143            assert!(matches!(result, Ok(42)));
1144
1145            // Ensure context is consumed
1146            let handle = context.spawn_ref();
1147            let result = handle(async move { 42 }).await;
1148            assert!(matches!(result, Ok(42)));
1149        });
1150    }
1151
1152    fn test_spawn_duplicate<R: Runner>(runner: R)
1153    where
1154        R::Context: Spawner,
1155    {
1156        runner.start(|mut context| async move {
1157            let handle = context.spawn_ref();
1158            let result = handle(async move { 42 }).await;
1159            assert!(matches!(result, Ok(42)));
1160
1161            // Ensure context is consumed
1162            context.spawn(|_| async move { 42 });
1163        });
1164    }
1165
1166    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1167    where
1168        R::Context: Spawner,
1169    {
1170        runner.start(|context| async move {
1171            let handle = context.spawn_blocking(dedicated, |_| 42);
1172            let result = handle.await;
1173            assert!(matches!(result, Ok(42)));
1174        });
1175    }
1176
1177    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1178    where
1179        R::Context: Spawner,
1180    {
1181        runner.start(|mut context| async move {
1182            let spawn = context.spawn_blocking_ref(dedicated);
1183            let handle = spawn(|| 42);
1184            let result = handle.await;
1185            assert!(matches!(result, Ok(42)));
1186        });
1187    }
1188
1189    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1190    where
1191        R::Context: Spawner,
1192    {
1193        runner.start(|mut context| async move {
1194            let spawn = context.spawn_blocking_ref(dedicated);
1195            let result = spawn(|| 42).await;
1196            assert!(matches!(result, Ok(42)));
1197
1198            // Ensure context is consumed
1199            context.spawn_blocking(dedicated, |_| 42);
1200        });
1201    }
1202
1203    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1204    where
1205        R::Context: Spawner,
1206    {
1207        runner.start(|context| async move {
1208            // Create task
1209            let (sender, mut receiver) = oneshot::channel();
1210            let handle = context.spawn_blocking(dedicated, move |_| {
1211                // Wait for abort to be called
1212                loop {
1213                    if receiver.try_recv().is_ok() {
1214                        break;
1215                    }
1216                }
1217
1218                // Perform a long-running operation
1219                let mut count = 0;
1220                loop {
1221                    count += 1;
1222                    if count >= 100_000_000 {
1223                        break;
1224                    }
1225                }
1226                count
1227            });
1228
1229            // Abort the task
1230            //
1231            // If there was an `.await` prior to sending a message over the oneshot, this test
1232            // could deadlock (depending on the runtime implementation) because the blocking task
1233            // would never yield (preventing send from being called).
1234            handle.abort();
1235            sender.send(()).unwrap();
1236
1237            // Wait for the task to complete
1238            assert!(matches!(handle.await, Ok(100_000_000)));
1239        });
1240    }
1241
1242    fn test_metrics<R: Runner>(runner: R)
1243    where
1244        R::Context: Metrics,
1245    {
1246        runner.start(|context| async move {
1247            // Assert label
1248            assert_eq!(context.label(), "");
1249
1250            // Register a metric
1251            let counter = Counter::<u64>::default();
1252            context.register("test", "test", counter.clone());
1253
1254            // Increment the counter
1255            counter.inc();
1256
1257            // Encode metrics
1258            let buffer = context.encode();
1259            assert!(buffer.contains("test_total 1"));
1260
1261            // Nested context
1262            let context = context.with_label("nested");
1263            let nested_counter = Counter::<u64>::default();
1264            context.register("test", "test", nested_counter.clone());
1265
1266            // Increment the counter
1267            nested_counter.inc();
1268
1269            // Encode metrics
1270            let buffer = context.encode();
1271            assert!(buffer.contains("nested_test_total 1"));
1272            assert!(buffer.contains("test_total 1"));
1273        });
1274    }
1275
1276    fn test_metrics_label<R: Runner>(runner: R)
1277    where
1278        R::Context: Metrics,
1279    {
1280        runner.start(|context| async move {
1281            context.with_label(METRICS_PREFIX);
1282        })
1283    }
1284
1285    #[test]
1286    fn test_deterministic_future() {
1287        let runner = deterministic::Runner::default();
1288        test_error_future(runner);
1289    }
1290
1291    #[test]
1292    fn test_deterministic_clock_sleep() {
1293        let executor = deterministic::Runner::default();
1294        test_clock_sleep(executor);
1295    }
1296
1297    #[test]
1298    fn test_deterministic_clock_sleep_until() {
1299        let executor = deterministic::Runner::default();
1300        test_clock_sleep_until(executor);
1301    }
1302
1303    #[test]
1304    fn test_deterministic_root_finishes() {
1305        let executor = deterministic::Runner::default();
1306        test_root_finishes(executor);
1307    }
1308
1309    #[test]
1310    fn test_deterministic_spawn_abort() {
1311        let executor = deterministic::Runner::default();
1312        test_spawn_abort(executor);
1313    }
1314
1315    #[test]
1316    fn test_deterministic_panic_aborts_root() {
1317        let runner = deterministic::Runner::default();
1318        test_panic_aborts_root(runner);
1319    }
1320
1321    #[test]
1322    #[should_panic(expected = "blah")]
1323    fn test_deterministic_panic_aborts_spawn() {
1324        let executor = deterministic::Runner::default();
1325        test_panic_aborts_spawn(executor);
1326    }
1327
1328    #[test]
1329    fn test_deterministic_select() {
1330        let executor = deterministic::Runner::default();
1331        test_select(executor);
1332    }
1333
1334    #[test]
1335    fn test_deterministic_select_loop() {
1336        let executor = deterministic::Runner::default();
1337        test_select_loop(executor);
1338    }
1339
1340    #[test]
1341    fn test_deterministic_storage_operations() {
1342        let executor = deterministic::Runner::default();
1343        test_storage_operations(executor);
1344    }
1345
1346    #[test]
1347    fn test_deterministic_blob_read_write() {
1348        let executor = deterministic::Runner::default();
1349        test_blob_read_write(executor);
1350    }
1351
1352    #[test]
1353    fn test_deterministic_blob_resize() {
1354        let executor = deterministic::Runner::default();
1355        test_blob_resize(executor);
1356    }
1357
1358    #[test]
1359    fn test_deterministic_many_partition_read_write() {
1360        let executor = deterministic::Runner::default();
1361        test_many_partition_read_write(executor);
1362    }
1363
1364    #[test]
1365    fn test_deterministic_blob_read_past_length() {
1366        let executor = deterministic::Runner::default();
1367        test_blob_read_past_length(executor);
1368    }
1369
1370    #[test]
1371    fn test_deterministic_blob_clone_and_concurrent_read() {
1372        // Run test
1373        let executor = deterministic::Runner::default();
1374        test_blob_clone_and_concurrent_read(executor);
1375    }
1376
1377    #[test]
1378    fn test_deterministic_shutdown() {
1379        let executor = deterministic::Runner::default();
1380        test_shutdown(executor);
1381    }
1382
1383    #[test]
1384    fn test_deterministic_shutdown_multiple_signals() {
1385        let executor = deterministic::Runner::default();
1386        test_shutdown_multiple_signals(executor);
1387    }
1388
1389    #[test]
1390    fn test_deterministic_shutdown_timeout() {
1391        let executor = deterministic::Runner::default();
1392        test_shutdown_timeout(executor);
1393    }
1394
1395    #[test]
1396    fn test_deterministic_shutdown_multiple_stop_calls() {
1397        let executor = deterministic::Runner::default();
1398        test_shutdown_multiple_stop_calls(executor);
1399    }
1400
1401    #[test]
1402    fn test_deterministic_spawn_ref() {
1403        let executor = deterministic::Runner::default();
1404        test_spawn_ref(executor);
1405    }
1406
1407    #[test]
1408    #[should_panic]
1409    fn test_deterministic_spawn_ref_duplicate() {
1410        let executor = deterministic::Runner::default();
1411        test_spawn_ref_duplicate(executor);
1412    }
1413
1414    #[test]
1415    #[should_panic]
1416    fn test_deterministic_spawn_duplicate() {
1417        let executor = deterministic::Runner::default();
1418        test_spawn_duplicate(executor);
1419    }
1420
1421    #[test]
1422    fn test_deterministic_spawn_blocking() {
1423        for dedicated in [false, true] {
1424            let executor = deterministic::Runner::default();
1425            test_spawn_blocking(executor, dedicated);
1426        }
1427    }
1428
1429    #[test]
1430    #[should_panic(expected = "blocking task panicked")]
1431    fn test_deterministic_spawn_blocking_panic() {
1432        for dedicated in [false, true] {
1433            let executor = deterministic::Runner::default();
1434            executor.start(|context| async move {
1435                let handle = context.spawn_blocking(dedicated, |_| {
1436                    panic!("blocking task panicked");
1437                });
1438                handle.await.unwrap();
1439            });
1440        }
1441    }
1442
1443    #[test]
1444    fn test_deterministic_spawn_blocking_abort() {
1445        for dedicated in [false, true] {
1446            let executor = deterministic::Runner::default();
1447            test_spawn_blocking_abort(executor, dedicated);
1448        }
1449    }
1450
1451    #[test]
1452    fn test_deterministic_spawn_blocking_ref() {
1453        for dedicated in [false, true] {
1454            let executor = deterministic::Runner::default();
1455            test_spawn_blocking_ref(executor, dedicated);
1456        }
1457    }
1458
1459    #[test]
1460    #[should_panic]
1461    fn test_deterministic_spawn_blocking_ref_duplicate() {
1462        for dedicated in [false, true] {
1463            let executor = deterministic::Runner::default();
1464            test_spawn_blocking_ref_duplicate(executor, dedicated);
1465        }
1466    }
1467
1468    #[test]
1469    fn test_deterministic_metrics() {
1470        let executor = deterministic::Runner::default();
1471        test_metrics(executor);
1472    }
1473
1474    #[test]
1475    #[should_panic]
1476    fn test_deterministic_metrics_label() {
1477        let executor = deterministic::Runner::default();
1478        test_metrics_label(executor);
1479    }
1480
1481    #[test]
1482    fn test_tokio_error_future() {
1483        let runner = tokio::Runner::default();
1484        test_error_future(runner);
1485    }
1486
1487    #[test]
1488    fn test_tokio_clock_sleep() {
1489        let executor = tokio::Runner::default();
1490        test_clock_sleep(executor);
1491    }
1492
1493    #[test]
1494    fn test_tokio_clock_sleep_until() {
1495        let executor = tokio::Runner::default();
1496        test_clock_sleep_until(executor);
1497    }
1498
1499    #[test]
1500    fn test_tokio_root_finishes() {
1501        let executor = tokio::Runner::default();
1502        test_root_finishes(executor);
1503    }
1504
1505    #[test]
1506    fn test_tokio_spawn_abort() {
1507        let executor = tokio::Runner::default();
1508        test_spawn_abort(executor);
1509    }
1510
1511    #[test]
1512    fn test_tokio_panic_aborts_root() {
1513        let executor = tokio::Runner::default();
1514        test_panic_aborts_root(executor);
1515    }
1516
1517    #[test]
1518    fn test_tokio_panic_aborts_spawn() {
1519        let executor = tokio::Runner::default();
1520        test_panic_aborts_spawn(executor);
1521    }
1522
1523    #[test]
1524    fn test_tokio_select() {
1525        let executor = tokio::Runner::default();
1526        test_select(executor);
1527    }
1528
1529    #[test]
1530    fn test_tokio_select_loop() {
1531        let executor = tokio::Runner::default();
1532        test_select_loop(executor);
1533    }
1534
1535    #[test]
1536    fn test_tokio_storage_operations() {
1537        let executor = tokio::Runner::default();
1538        test_storage_operations(executor);
1539    }
1540
1541    #[test]
1542    fn test_tokio_blob_read_write() {
1543        let executor = tokio::Runner::default();
1544        test_blob_read_write(executor);
1545    }
1546
1547    #[test]
1548    fn test_tokio_blob_resize() {
1549        let executor = tokio::Runner::default();
1550        test_blob_resize(executor);
1551    }
1552
1553    #[test]
1554    fn test_tokio_many_partition_read_write() {
1555        let executor = tokio::Runner::default();
1556        test_many_partition_read_write(executor);
1557    }
1558
1559    #[test]
1560    fn test_tokio_blob_read_past_length() {
1561        let executor = tokio::Runner::default();
1562        test_blob_read_past_length(executor);
1563    }
1564
1565    #[test]
1566    fn test_tokio_blob_clone_and_concurrent_read() {
1567        // Run test
1568        let executor = tokio::Runner::default();
1569        test_blob_clone_and_concurrent_read(executor);
1570    }
1571
1572    #[test]
1573    fn test_tokio_shutdown() {
1574        let executor = tokio::Runner::default();
1575        test_shutdown(executor);
1576    }
1577
1578    #[test]
1579    fn test_tokio_shutdown_multiple_signals() {
1580        let executor = tokio::Runner::default();
1581        test_shutdown_multiple_signals(executor);
1582    }
1583
1584    #[test]
1585    fn test_tokio_shutdown_timeout() {
1586        let executor = tokio::Runner::default();
1587        test_shutdown_timeout(executor);
1588    }
1589
1590    #[test]
1591    fn test_tokio_shutdown_multiple_stop_calls() {
1592        let executor = tokio::Runner::default();
1593        test_shutdown_multiple_stop_calls(executor);
1594    }
1595
1596    #[test]
1597    fn test_tokio_spawn_ref() {
1598        let executor = tokio::Runner::default();
1599        test_spawn_ref(executor);
1600    }
1601
1602    #[test]
1603    #[should_panic]
1604    fn test_tokio_spawn_ref_duplicate() {
1605        let executor = tokio::Runner::default();
1606        test_spawn_ref_duplicate(executor);
1607    }
1608
1609    #[test]
1610    #[should_panic]
1611    fn test_tokio_spawn_duplicate() {
1612        let executor = tokio::Runner::default();
1613        test_spawn_duplicate(executor);
1614    }
1615
1616    #[test]
1617    fn test_tokio_spawn_blocking() {
1618        for dedicated in [false, true] {
1619            let executor = tokio::Runner::default();
1620            test_spawn_blocking(executor, dedicated);
1621        }
1622    }
1623
1624    #[test]
1625    fn test_tokio_spawn_blocking_panic() {
1626        for dedicated in [false, true] {
1627            let executor = tokio::Runner::default();
1628            executor.start(|context| async move {
1629                let handle = context.spawn_blocking(dedicated, |_| {
1630                    panic!("blocking task panicked");
1631                });
1632                let result = handle.await;
1633                assert!(matches!(result, Err(Error::Exited)));
1634            });
1635        }
1636    }
1637
1638    #[test]
1639    fn test_tokio_spawn_blocking_abort() {
1640        for dedicated in [false, true] {
1641            let executor = tokio::Runner::default();
1642            test_spawn_blocking_abort(executor, dedicated);
1643        }
1644    }
1645
1646    #[test]
1647    fn test_tokio_spawn_blocking_ref() {
1648        for dedicated in [false, true] {
1649            let executor = tokio::Runner::default();
1650            test_spawn_blocking_ref(executor, dedicated);
1651        }
1652    }
1653
1654    #[test]
1655    #[should_panic]
1656    fn test_tokio_spawn_blocking_ref_duplicate() {
1657        for dedicated in [false, true] {
1658            let executor = tokio::Runner::default();
1659            test_spawn_blocking_ref_duplicate(executor, dedicated);
1660        }
1661    }
1662
1663    #[test]
1664    fn test_tokio_metrics() {
1665        let executor = tokio::Runner::default();
1666        test_metrics(executor);
1667    }
1668
1669    #[test]
1670    #[should_panic]
1671    fn test_tokio_metrics_label() {
1672        let executor = tokio::Runner::default();
1673        test_metrics_label(executor);
1674    }
1675
1676    #[test]
1677    fn test_tokio_process_rss_metric() {
1678        let executor = tokio::Runner::default();
1679        executor.start(|context| async move {
1680            loop {
1681                // Wait for RSS metric to be available
1682                let metrics = context.encode();
1683                if !metrics.contains("runtime_process_rss") {
1684                    context.sleep(Duration::from_millis(100)).await;
1685                    continue;
1686                }
1687
1688                // Verify the RSS value is eventually populated (greater than 0)
1689                for line in metrics.lines() {
1690                    if line.starts_with("runtime_process_rss")
1691                        && !line.starts_with("runtime_process_rss{")
1692                    {
1693                        let parts: Vec<&str> = line.split_whitespace().collect();
1694                        if parts.len() >= 2 {
1695                            let rss_value: i64 =
1696                                parts[1].parse().expect("Failed to parse RSS value");
1697                            if rss_value > 0 {
1698                                return;
1699                            }
1700                        }
1701                    }
1702                }
1703            }
1704        });
1705    }
1706
1707    #[test]
1708    fn test_tokio_telemetry() {
1709        let executor = tokio::Runner::default();
1710        executor.start(|context| async move {
1711            // Define the server address
1712            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1713
1714            // Configure telemetry
1715            tokio::telemetry::init(
1716                context.with_label("metrics"),
1717                tokio::telemetry::Logging {
1718                    level: Level::INFO,
1719                    json: false,
1720                },
1721                Some(address),
1722                None,
1723            );
1724
1725            // Register a test metric
1726            let counter: Counter<u64> = Counter::default();
1727            context.register("test_counter", "Test counter", counter.clone());
1728            counter.inc();
1729
1730            // Helper functions to parse HTTP response
1731            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1732                let mut line = Vec::new();
1733                loop {
1734                    let byte = stream.recv(vec![0; 1]).await?;
1735                    if byte[0] == b'\n' {
1736                        if line.last() == Some(&b'\r') {
1737                            line.pop(); // Remove trailing \r
1738                        }
1739                        break;
1740                    }
1741                    line.push(byte[0]);
1742                }
1743                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1744            }
1745
1746            async fn read_headers<St: Stream>(
1747                stream: &mut St,
1748            ) -> Result<HashMap<String, String>, Error> {
1749                let mut headers = HashMap::new();
1750                loop {
1751                    let line = read_line(stream).await?;
1752                    if line.is_empty() {
1753                        break;
1754                    }
1755                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1756                    if parts.len() == 2 {
1757                        headers.insert(parts[0].to_string(), parts[1].to_string());
1758                    }
1759                }
1760                Ok(headers)
1761            }
1762
1763            async fn read_body<St: Stream>(
1764                stream: &mut St,
1765                content_length: usize,
1766            ) -> Result<String, Error> {
1767                let read = stream.recv(vec![0; content_length]).await?;
1768                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1769            }
1770
1771            // Simulate a client connecting to the server
1772            let client_handle = context
1773                .with_label("client")
1774                .spawn(move |context| async move {
1775                    let (mut sink, mut stream) = loop {
1776                        match context.dial(address).await {
1777                            Ok((sink, stream)) => break (sink, stream),
1778                            Err(e) => {
1779                                // The client may be polled before the server is ready, that's alright!
1780                                error!(err =?e, "failed to connect");
1781                                context.sleep(Duration::from_millis(10)).await;
1782                            }
1783                        }
1784                    };
1785
1786                    // Send a GET request to the server
1787                    let request = format!(
1788                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1789                    );
1790                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
1791
1792                    // Read and verify the HTTP status line
1793                    let status_line = read_line(&mut stream).await.unwrap();
1794                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1795
1796                    // Read and parse headers
1797                    let headers = read_headers(&mut stream).await.unwrap();
1798                    println!("Headers: {headers:?}");
1799                    let content_length = headers
1800                        .get("content-length")
1801                        .unwrap()
1802                        .parse::<usize>()
1803                        .unwrap();
1804
1805                    // Read and verify the body
1806                    let body = read_body(&mut stream, content_length).await.unwrap();
1807                    assert!(body.contains("test_counter_total 1"));
1808                });
1809
1810            // Wait for the client task to complete
1811            client_handle.await.unwrap();
1812        });
1813    }
1814}