commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23    future::Future,
24    net::SocketAddr,
25    time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32    if #[cfg(not(target_arch = "wasm32"))] {
33        pub mod tokio;
34        pub mod benchmarks;
35    }
36}
37mod network;
38mod storage;
39pub mod telemetry;
40mod utils;
41pub use utils::{
42    create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
43};
44
45/// Prefix for runtime metrics.
46const METRICS_PREFIX: &str = "runtime";
47
48/// Errors that can occur when interacting with the runtime.
49#[derive(Error, Debug)]
50pub enum Error {
51    #[error("exited")]
52    Exited,
53    #[error("closed")]
54    Closed,
55    #[error("timeout")]
56    Timeout,
57    #[error("bind failed")]
58    BindFailed,
59    #[error("connection failed")]
60    ConnectionFailed,
61    #[error("write failed")]
62    WriteFailed,
63    #[error("read failed")]
64    ReadFailed,
65    #[error("send failed")]
66    SendFailed,
67    #[error("recv failed")]
68    RecvFailed,
69    #[error("partition creation failed: {0}")]
70    PartitionCreationFailed(String),
71    #[error("partition missing: {0}")]
72    PartitionMissing(String),
73    #[error("partition corrupt: {0}")]
74    PartitionCorrupt(String),
75    #[error("blob open failed: {0}/{1} error: {2}")]
76    BlobOpenFailed(String, String, IoError),
77    #[error("blob missing: {0}/{1}")]
78    BlobMissing(String, String),
79    #[error("blob truncate failed: {0}/{1} error: {2}")]
80    BlobTruncateFailed(String, String, IoError),
81    #[error("blob sync failed: {0}/{1} error: {2}")]
82    BlobSyncFailed(String, String, IoError),
83    #[error("blob close failed: {0}/{1} error: {2}")]
84    BlobCloseFailed(String, String, IoError),
85    #[error("blob insufficient length")]
86    BlobInsufficientLength,
87    #[error("offset overflow")]
88    OffsetOverflow,
89}
90
91/// Interface that any task scheduler must implement to start
92/// running tasks.
93pub trait Runner {
94    /// Context defines the environment available to tasks.
95    /// Example of possible services provided by the context include:
96    /// - [Clock] for time-based operations
97    /// - [Network] for network operations
98    /// - [Storage] for storage operations
99    type Context;
100
101    /// Start running a root task.
102    fn start<F, Fut>(self, f: F) -> Fut::Output
103    where
104        F: FnOnce(Self::Context) -> Fut,
105        Fut: Future;
106}
107
108/// Interface that any task scheduler must implement to spawn tasks.
109pub trait Spawner: Clone + Send + Sync + 'static {
110    /// Enqueue a task to be executed.
111    ///
112    /// Unlike a future, a spawned task will start executing immediately (even if the caller
113    /// does not await the handle).
114    ///
115    /// Spawned tasks consume the context used to create them. This ensures that context cannot
116    /// be shared between tasks and that a task's context always comes from somewhere.
117    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
118    where
119        F: FnOnce(Self) -> Fut + Send + 'static,
120        Fut: Future<Output = T> + Send + 'static,
121        T: Send + 'static;
122
123    /// Enqueue a task to be executed (without consuming the context).
124    ///
125    /// Unlike a future, a spawned task will start executing immediately (even if the caller
126    /// does not await the handle).
127    ///
128    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
129    /// an actor that already has a reference to context).
130    ///
131    /// # Warning
132    ///
133    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
134    /// to prevent accidental misuse.
135    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136    where
137        F: Future<Output = T> + Send + 'static,
138        T: Send + 'static;
139
140    /// Enqueue a blocking task to be executed.
141    ///
142    /// This method is designed for synchronous, potentially long-running operations that should
143    /// not block the asynchronous event loop. The task starts executing immediately, and the
144    /// returned handle can be awaited to retrieve the result.
145    ///
146    /// # Warning
147    ///
148    /// Blocking tasks cannot be aborted.
149    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
150    where
151        F: FnOnce() -> T + Send + 'static,
152        T: Send + 'static;
153
154    /// Signals the runtime to stop execution and that all outstanding tasks
155    /// should perform any required cleanup and exit. This method is idempotent and
156    /// can be called multiple times.
157    ///
158    /// This method does not actually kill any tasks but rather signals to them, using
159    /// the `Signal` returned by `stopped`, that they should exit.
160    fn stop(&self, value: i32);
161
162    /// Returns an instance of a `Signal` that resolves when `stop` is called by
163    /// any task.
164    ///
165    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
166    fn stopped(&self) -> Signal;
167}
168
169/// Interface to register and encode metrics.
170pub trait Metrics: Clone + Send + Sync + 'static {
171    /// Get the current label of the context.
172    fn label(&self) -> String;
173
174    /// Create a new instance of `Metrics` with the given label appended to the end
175    /// of the current `Metrics` label.
176    ///
177    /// This is commonly used to create a nested context for `register`.
178    ///
179    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
180    /// label (reserved for metrics for the runtime).
181    fn with_label(&self, label: &str) -> Self;
182
183    /// Prefix the given label with the current context's label.
184    ///
185    /// Unlike `with_label`, this method does not create a new context.
186    fn scoped_label(&self, label: &str) -> String {
187        let label = if self.label().is_empty() {
188            label.to_string()
189        } else {
190            format!("{}_{}", self.label(), label)
191        };
192        assert!(
193            !label.starts_with(METRICS_PREFIX),
194            "using runtime label is not allowed"
195        );
196        label
197    }
198
199    /// Register a metric with the runtime.
200    ///
201    /// Any registered metric will include (as a prefix) the label of the current context.
202    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
203
204    /// Encode all metrics into a buffer.
205    fn encode(&self) -> String;
206}
207
208/// Interface that any task scheduler must implement to provide
209/// time-based operations.
210///
211/// It is necessary to mock time to provide deterministic execution
212/// of arbitrary tasks.
213pub trait Clock: Clone + Send + Sync + 'static {
214    /// Returns the current time.
215    fn current(&self) -> SystemTime;
216
217    /// Sleep for the given duration.
218    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
219
220    /// Sleep until the given deadline.
221    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
222}
223
224/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
225pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
226
227/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
228pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
229
230/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
231pub type ListenerOf<N> = <N as crate::Network>::Listener;
232
233/// Interface that any runtime must implement to create
234/// network connections.
235pub trait Network: Clone + Send + Sync + 'static {
236    /// The type of [Listener] that's returned when binding to a socket.
237    /// Accepting a connection returns a [Sink] and [Stream] which are defined
238    /// by the [Listener] and used to send and receive data over the connection.
239    type Listener: Listener;
240
241    /// Bind to the given socket address.
242    fn bind(
243        &self,
244        socket: SocketAddr,
245    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
246
247    /// Dial the given socket address.
248    fn dial(
249        &self,
250        socket: SocketAddr,
251    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
252}
253
254/// Interface that any runtime must implement to handle
255/// incoming network connections.
256pub trait Listener: Sync + Send + 'static {
257    /// The type of [Sink] that's returned when accepting a connection.
258    /// This is used to send data to the remote connection.
259    type Sink: Sink;
260    /// The type of [Stream] that's returned when accepting a connection.
261    /// This is used to receive data from the remote connection.
262    type Stream: Stream;
263
264    /// Accept an incoming connection.
265    fn accept(
266        &mut self,
267    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
268
269    /// Returns the local address of the listener.
270    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
271}
272
273/// Interface that any runtime must implement to send
274/// messages over a network connection.
275pub trait Sink: Sync + Send + 'static {
276    /// Send a message to the sink.
277    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
278}
279
280/// Interface that any runtime must implement to receive
281/// messages over a network connection.
282pub trait Stream: Sync + Send + 'static {
283    /// Receive a message from the stream, storing it in the given buffer.
284    /// Reads exactly the number of bytes that fit in the buffer.
285    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
286}
287
288/// Interface to interact with storage.
289///
290///
291/// To support storage implementations that enable concurrent reads and
292/// writes, blobs are responsible for maintaining synchronization.
293///
294/// Storage can be backed by a local filesystem, cloud storage, etc.
295pub trait Storage: Clone + Send + Sync + 'static {
296    /// The readable/writeable storage buffer that can be opened by this Storage.
297    type Blob: Blob;
298
299    /// Open an existing blob in a given partition or create a new one, returning
300    /// the blob and its length.
301    ///
302    /// Multiple instances of the same blob can be opened concurrently, however,
303    /// writing to the same blob concurrently may lead to undefined behavior.
304    fn open(
305        &self,
306        partition: &str,
307        name: &[u8],
308    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
309
310    /// Remove a blob from a given partition.
311    ///
312    /// If no `name` is provided, the entire partition is removed.
313    fn remove(
314        &self,
315        partition: &str,
316        name: Option<&[u8]>,
317    ) -> impl Future<Output = Result<(), Error>> + Send;
318
319    /// Return all blobs in a given partition.
320    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
321}
322
323/// Interface to read and write to a blob.
324///
325/// To support blob implementations that enable concurrent reads and
326/// writes, blobs are responsible for maintaining synchronization.
327///
328/// Cloning a blob is similar to wrapping a single file descriptor in
329/// a lock whereas opening a new blob (of the same name) is similar to
330/// opening a new file descriptor. If multiple blobs are opened with the same
331/// name, they are not expected to coordinate access to underlying storage
332/// and writing to both is undefined behavior.
333#[allow(clippy::len_without_is_empty)]
334pub trait Blob: Clone + Send + Sync + 'static {
335    /// Read from the blob at the given offset.
336    ///
337    /// `read_at` does not return the number of bytes read because it
338    /// only returns once the entire buffer has been filled.
339    fn read_at(
340        &self,
341        buf: &mut [u8],
342        offset: u64,
343    ) -> impl Future<Output = Result<(), Error>> + Send;
344
345    /// Write to the blob at the given offset.
346    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
347
348    /// Truncate the blob to the given length.
349    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
350
351    /// Ensure all pending data is durably persisted.
352    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
353
354    /// Close the blob.
355    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use commonware_macros::select;
362    use futures::channel::oneshot;
363    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
364    use prometheus_client::metrics::counter::Counter;
365    use std::collections::HashMap;
366    use std::panic::{catch_unwind, AssertUnwindSafe};
367    use std::str::FromStr;
368    use std::sync::Mutex;
369    use tracing::{error, Level};
370    use utils::reschedule;
371
372    fn test_error_future<R: Runner>(runner: R) {
373        async fn error_future() -> Result<&'static str, &'static str> {
374            Err("An error occurred")
375        }
376        let result = runner.start(|_| error_future());
377        assert_eq!(result, Err("An error occurred"));
378    }
379
380    fn test_clock_sleep<R: Runner>(runner: R)
381    where
382        R::Context: Spawner + Clock,
383    {
384        runner.start(|context| async move {
385            // Capture initial time
386            let start = context.current();
387            let sleep_duration = Duration::from_millis(10);
388            context.sleep(sleep_duration).await;
389
390            // After run, time should have advanced
391            let end = context.current();
392            assert!(end.duration_since(start).unwrap() >= sleep_duration);
393        });
394    }
395
396    fn test_clock_sleep_until<R: Runner>(runner: R)
397    where
398        R::Context: Spawner + Clock,
399    {
400        runner.start(|context| async move {
401            // Trigger sleep
402            let now = context.current();
403            context.sleep_until(now + Duration::from_millis(100)).await;
404
405            // Ensure slept duration has elapsed
406            let elapsed = now.elapsed().unwrap();
407            assert!(elapsed >= Duration::from_millis(100));
408        });
409    }
410
411    fn test_root_finishes<R: Runner>(runner: R)
412    where
413        R::Context: Spawner,
414    {
415        runner.start(|context| async move {
416            context.spawn(|_| async move {
417                loop {
418                    reschedule().await;
419                }
420            });
421        });
422    }
423
424    fn test_spawn_abort<R: Runner>(runner: R)
425    where
426        R::Context: Spawner,
427    {
428        runner.start(|context| async move {
429            let handle = context.spawn(|_| async move {
430                loop {
431                    reschedule().await;
432                }
433            });
434            handle.abort();
435            assert!(matches!(handle.await, Err(Error::Closed)));
436        });
437    }
438
439    fn test_panic_aborts_root<R: Runner>(runner: R) {
440        let result = catch_unwind(AssertUnwindSafe(|| {
441            runner.start(|_| async move {
442                panic!("blah");
443            });
444        }));
445        result.unwrap_err();
446    }
447
448    fn test_panic_aborts_spawn<R: Runner>(runner: R)
449    where
450        R::Context: Spawner,
451    {
452        let result = runner.start(|context| async move {
453            let result = context.spawn(|_| async move {
454                panic!("blah");
455            });
456            assert!(matches!(result.await, Err(Error::Exited)));
457            Result::<(), Error>::Ok(())
458        });
459
460        // Ensure panic was caught
461        result.unwrap();
462    }
463
464    fn test_select<R: Runner>(runner: R) {
465        runner.start(|_| async move {
466            // Test first branch
467            let output = Mutex::new(0);
468            select! {
469                v1 = ready(1) => {
470                    *output.lock().unwrap() = v1;
471                },
472                v2 = ready(2) => {
473                    *output.lock().unwrap() = v2;
474                },
475            };
476            assert_eq!(*output.lock().unwrap(), 1);
477
478            // Test second branch
479            select! {
480                v1 = std::future::pending::<i32>() => {
481                    *output.lock().unwrap() = v1;
482                },
483                v2 = ready(2) => {
484                    *output.lock().unwrap() = v2;
485                },
486            };
487            assert_eq!(*output.lock().unwrap(), 2);
488        });
489    }
490
491    /// Ensure future fusing works as expected.
492    fn test_select_loop<R: Runner>(runner: R)
493    where
494        R::Context: Clock,
495    {
496        runner.start(|context| async move {
497            // Should hit timeout
498            let (mut sender, mut receiver) = mpsc::unbounded();
499            for _ in 0..2 {
500                select! {
501                    v = receiver.next() => {
502                        panic!("unexpected value: {:?}", v);
503                    },
504                    _ = context.sleep(Duration::from_millis(100)) => {
505                        continue;
506                    },
507                };
508            }
509
510            // Populate channel
511            sender.send(0).await.unwrap();
512            sender.send(1).await.unwrap();
513
514            // Prefer not reading channel without losing messages
515            select! {
516                _ = async {} => {
517                    // Skip reading from channel even though populated
518                },
519                v = receiver.next() => {
520                    panic!("unexpected value: {:?}", v);
521                },
522            };
523
524            // Process messages
525            for i in 0..2 {
526                select! {
527                    _ = context.sleep(Duration::from_millis(100)) => {
528                        panic!("timeout");
529                    },
530                    v = receiver.next() => {
531                        assert_eq!(v.unwrap(), i);
532                    },
533                };
534            }
535        });
536    }
537
538    fn test_storage_operations<R: Runner>(runner: R)
539    where
540        R::Context: Storage,
541    {
542        runner.start(|context| async move {
543            let partition = "test_partition";
544            let name = b"test_blob";
545
546            // Open a new blob
547            let (blob, _) = context
548                .open(partition, name)
549                .await
550                .expect("Failed to open blob");
551
552            // Write data to the blob
553            let data = b"Hello, Storage!";
554            blob.write_at(data, 0)
555                .await
556                .expect("Failed to write to blob");
557
558            // Sync the blob
559            blob.sync().await.expect("Failed to sync blob");
560
561            // Read data from the blob
562            let mut buffer = vec![0u8; data.len()];
563            blob.read_at(&mut buffer, 0)
564                .await
565                .expect("Failed to read from blob");
566            assert_eq!(&buffer, data);
567
568            // Close the blob
569            blob.close().await.expect("Failed to close blob");
570
571            // Scan blobs in the partition
572            let blobs = context
573                .scan(partition)
574                .await
575                .expect("Failed to scan partition");
576            assert!(blobs.contains(&name.to_vec()));
577
578            // Reopen the blob
579            let (blob, len) = context
580                .open(partition, name)
581                .await
582                .expect("Failed to reopen blob");
583            assert_eq!(len, data.len() as u64);
584
585            // Read data part of message back
586            let mut buffer = vec![0u8; 7];
587            blob.read_at(&mut buffer, 7)
588                .await
589                .expect("Failed to read data");
590            assert_eq!(&buffer, b"Storage");
591
592            // Close the blob
593            blob.close().await.expect("Failed to close blob");
594
595            // Remove the blob
596            context
597                .remove(partition, Some(name))
598                .await
599                .expect("Failed to remove blob");
600
601            // Ensure the blob is removed
602            let blobs = context
603                .scan(partition)
604                .await
605                .expect("Failed to scan partition");
606            assert!(!blobs.contains(&name.to_vec()));
607
608            // Remove the partition
609            context
610                .remove(partition, None)
611                .await
612                .expect("Failed to remove partition");
613
614            // Scan the partition
615            let result = context.scan(partition).await;
616            assert!(matches!(result, Err(Error::PartitionMissing(_))));
617        });
618    }
619
620    fn test_blob_read_write<R: Runner>(runner: R)
621    where
622        R::Context: Storage,
623    {
624        runner.start(|context| async move {
625            let partition = "test_partition";
626            let name = b"test_blob_rw";
627
628            // Open a new blob
629            let (blob, _) = context
630                .open(partition, name)
631                .await
632                .expect("Failed to open blob");
633
634            // Write data at different offsets
635            let data1 = b"Hello";
636            let data2 = b"World";
637            blob.write_at(data1, 0)
638                .await
639                .expect("Failed to write data1");
640            blob.write_at(data2, 5)
641                .await
642                .expect("Failed to write data2");
643
644            // Read data back
645            let mut buffer = vec![0u8; 10];
646            blob.read_at(&mut buffer, 0)
647                .await
648                .expect("Failed to read data");
649            assert_eq!(&buffer[..5], data1);
650            assert_eq!(&buffer[5..], data2);
651
652            // Rewrite data without affecting length
653            let data3 = b"Store";
654            blob.write_at(data3, 5)
655                .await
656                .expect("Failed to write data3");
657
658            // Truncate the blob
659            blob.truncate(5).await.expect("Failed to truncate blob");
660            let mut buffer = vec![0u8; 5];
661            blob.read_at(&mut buffer, 0)
662                .await
663                .expect("Failed to read data");
664            assert_eq!(&buffer[..5], data1);
665
666            // Full read after truncation
667            let mut buffer = vec![0u8; 10];
668            let result = blob.read_at(&mut buffer, 0).await;
669            assert!(result.is_err());
670
671            // Close the blob
672            blob.close().await.expect("Failed to close blob");
673        });
674    }
675
676    fn test_many_partition_read_write<R: Runner>(runner: R)
677    where
678        R::Context: Storage,
679    {
680        runner.start(|context| async move {
681            let partitions = ["partition1", "partition2", "partition3"];
682            let name = b"test_blob_rw";
683            let data1 = b"Hello";
684            let data2 = b"World";
685
686            for (additional, partition) in partitions.iter().enumerate() {
687                // Open a new blob
688                let (blob, _) = context
689                    .open(partition, name)
690                    .await
691                    .expect("Failed to open blob");
692
693                // Write data at different offsets
694                blob.write_at(data1, 0)
695                    .await
696                    .expect("Failed to write data1");
697                blob.write_at(data2, 5 + additional as u64)
698                    .await
699                    .expect("Failed to write data2");
700
701                // Close the blob
702                blob.close().await.expect("Failed to close blob");
703            }
704
705            for (additional, partition) in partitions.iter().enumerate() {
706                // Open a new blob
707                let (blob, len) = context
708                    .open(partition, name)
709                    .await
710                    .expect("Failed to open blob");
711                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
712
713                // Read data back
714                let mut buffer = vec![0u8; 10 + additional];
715                blob.read_at(&mut buffer, 0)
716                    .await
717                    .expect("Failed to read data");
718                assert_eq!(&buffer[..5], b"Hello");
719                assert_eq!(&buffer[5 + additional..], b"World");
720
721                // Close the blob
722                blob.close().await.expect("Failed to close blob");
723            }
724        });
725    }
726
727    fn test_blob_read_past_length<R: Runner>(runner: R)
728    where
729        R::Context: Storage,
730    {
731        runner.start(|context| async move {
732            let partition = "test_partition";
733            let name = b"test_blob_rw";
734
735            // Open a new blob
736            let (blob, _) = context
737                .open(partition, name)
738                .await
739                .expect("Failed to open blob");
740
741            // Read data past file length (empty file)
742            let mut buffer = vec![0u8; 10];
743            let result = blob.read_at(&mut buffer, 0).await;
744            assert!(result.is_err());
745
746            // Write data to the blob
747            let data = b"Hello, Storage!";
748            blob.write_at(data, 0)
749                .await
750                .expect("Failed to write to blob");
751
752            // Read data past file length (non-empty file)
753            let mut buffer = vec![0u8; 20];
754            let result = blob.read_at(&mut buffer, 0).await;
755            assert!(result.is_err());
756        })
757    }
758
759    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
760    where
761        R::Context: Spawner + Storage + Metrics,
762    {
763        runner.start(|context| async move {
764            let partition = "test_partition";
765            let name = b"test_blob_rw";
766
767            // Open a new blob
768            let (blob, _) = context
769                .open(partition, name)
770                .await
771                .expect("Failed to open blob");
772
773            // Write data to the blob
774            let data = b"Hello, Storage!";
775            blob.write_at(data, 0)
776                .await
777                .expect("Failed to write to blob");
778
779            // Sync the blob
780            blob.sync().await.expect("Failed to sync blob");
781
782            // Read data from the blob in clone
783            let check1 = context.with_label("check1").spawn({
784                let blob = blob.clone();
785                move |_| async move {
786                    let mut buffer = vec![0u8; data.len()];
787                    blob.read_at(&mut buffer, 0)
788                        .await
789                        .expect("Failed to read from blob");
790                    assert_eq!(&buffer, data);
791                }
792            });
793            let check2 = context.with_label("check2").spawn({
794                let blob = blob.clone();
795                move |_| async move {
796                    let mut buffer = vec![0u8; data.len()];
797                    blob.read_at(&mut buffer, 0)
798                        .await
799                        .expect("Failed to read from blob");
800                    assert_eq!(&buffer, data);
801                }
802            });
803
804            // Wait for both reads to complete
805            let result = join!(check1, check2);
806            assert!(result.0.is_ok());
807            assert!(result.1.is_ok());
808
809            // Read data from the blob
810            let mut buffer = vec![0u8; data.len()];
811            blob.read_at(&mut buffer, 0)
812                .await
813                .expect("Failed to read from blob");
814            assert_eq!(&buffer, data);
815
816            // Close the blob
817            blob.close().await.expect("Failed to close blob");
818
819            // Ensure no blobs still open
820            let buffer = context.encode();
821            assert!(buffer.contains("open_blobs 0"));
822        });
823    }
824
825    fn test_shutdown<R: Runner>(runner: R)
826    where
827        R::Context: Spawner + Metrics + Clock,
828    {
829        let kill = 9;
830        runner.start(|context| async move {
831            // Spawn a task that waits for signal
832            let before = context
833                .with_label("before")
834                .spawn(move |context| async move {
835                    let sig = context.stopped().await;
836                    assert_eq!(sig.unwrap(), kill);
837                });
838
839            // Spawn a task after stop is called
840            let after = context
841                .with_label("after")
842                .spawn(move |context| async move {
843                    // Wait for stop signal
844                    let mut signal = context.stopped();
845                    loop {
846                        select! {
847                            sig = &mut signal => {
848                                // Stopper resolved
849                                assert_eq!(sig.unwrap(), kill);
850                                break;
851                            },
852                            _ = context.sleep(Duration::from_millis(10)) => {
853                                // Continue waiting
854                            },
855                        }
856                    }
857                });
858
859            // Sleep for a bit before stopping
860            context.sleep(Duration::from_millis(50)).await;
861
862            // Signal the task
863            context.stop(kill);
864
865            // Ensure both tasks complete
866            let result = join!(before, after);
867            assert!(result.0.is_ok());
868            assert!(result.1.is_ok());
869        });
870    }
871
872    fn test_spawn_ref<R: Runner>(runner: R)
873    where
874        R::Context: Spawner,
875    {
876        runner.start(|mut context| async move {
877            let handle = context.spawn_ref();
878            let result = handle(async move { 42 }).await;
879            assert!(matches!(result, Ok(42)));
880        });
881    }
882
883    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
884    where
885        R::Context: Spawner,
886    {
887        runner.start(|mut context| async move {
888            let handle = context.spawn_ref();
889            let result = handle(async move { 42 }).await;
890            assert!(matches!(result, Ok(42)));
891
892            // Ensure context is consumed
893            let handle = context.spawn_ref();
894            let result = handle(async move { 42 }).await;
895            assert!(matches!(result, Ok(42)));
896        });
897    }
898
899    fn test_spawn_duplicate<R: Runner>(runner: R)
900    where
901        R::Context: Spawner,
902    {
903        runner.start(|mut context| async move {
904            let handle = context.spawn_ref();
905            let result = handle(async move { 42 }).await;
906            assert!(matches!(result, Ok(42)));
907
908            // Ensure context is consumed
909            context.spawn(|_| async move { 42 });
910        });
911    }
912
913    fn test_spawn_blocking<R: Runner>(runner: R)
914    where
915        R::Context: Spawner,
916    {
917        runner.start(|context| async move {
918            let handle = context.spawn_blocking(|| 42);
919            let result = handle.await;
920            assert!(matches!(result, Ok(42)));
921        });
922    }
923
924    fn test_spawn_blocking_abort<R: Runner>(runner: R)
925    where
926        R::Context: Spawner,
927    {
928        runner.start(|context| async move {
929            // Create task
930            let (sender, mut receiver) = oneshot::channel();
931            let handle = context.spawn_blocking(move || {
932                // Wait for abort to be called
933                loop {
934                    if receiver.try_recv().is_ok() {
935                        break;
936                    }
937                }
938
939                // Perform a long-running operation
940                let mut count = 0;
941                loop {
942                    count += 1;
943                    if count >= 100_000_000 {
944                        break;
945                    }
946                }
947                count
948            });
949
950            // Abort the task
951            //
952            // If there was an `.await` prior to sending a message over the oneshot, this test
953            // could deadlock (depending on the runtime implementation) because the blocking task
954            // would never yield (preventing send from being called).
955            handle.abort();
956            sender.send(()).unwrap();
957
958            // Wait for the task to complete
959            assert!(matches!(handle.await, Ok(100_000_000)));
960        });
961    }
962
963    fn test_metrics<R: Runner>(runner: R)
964    where
965        R::Context: Metrics,
966    {
967        runner.start(|context| async move {
968            // Assert label
969            assert_eq!(context.label(), "");
970
971            // Register a metric
972            let counter = Counter::<u64>::default();
973            context.register("test", "test", counter.clone());
974
975            // Increment the counter
976            counter.inc();
977
978            // Encode metrics
979            let buffer = context.encode();
980            assert!(buffer.contains("test_total 1"));
981
982            // Nested context
983            let context = context.with_label("nested");
984            let nested_counter = Counter::<u64>::default();
985            context.register("test", "test", nested_counter.clone());
986
987            // Increment the counter
988            nested_counter.inc();
989
990            // Encode metrics
991            let buffer = context.encode();
992            assert!(buffer.contains("nested_test_total 1"));
993            assert!(buffer.contains("test_total 1"));
994        });
995    }
996
997    fn test_metrics_label<R: Runner>(runner: R)
998    where
999        R::Context: Metrics,
1000    {
1001        runner.start(|context| async move {
1002            context.with_label(METRICS_PREFIX);
1003        })
1004    }
1005
1006    #[test]
1007    fn test_deterministic_future() {
1008        let runner = deterministic::Runner::default();
1009        test_error_future(runner);
1010    }
1011
1012    #[test]
1013    fn test_deterministic_clock_sleep() {
1014        let executor = deterministic::Runner::default();
1015        test_clock_sleep(executor);
1016    }
1017
1018    #[test]
1019    fn test_deterministic_clock_sleep_until() {
1020        let executor = deterministic::Runner::default();
1021        test_clock_sleep_until(executor);
1022    }
1023
1024    #[test]
1025    fn test_deterministic_root_finishes() {
1026        let executor = deterministic::Runner::default();
1027        test_root_finishes(executor);
1028    }
1029
1030    #[test]
1031    fn test_deterministic_spawn_abort() {
1032        let executor = deterministic::Runner::default();
1033        test_spawn_abort(executor);
1034    }
1035
1036    #[test]
1037    fn test_deterministic_panic_aborts_root() {
1038        let runner = deterministic::Runner::default();
1039        test_panic_aborts_root(runner);
1040    }
1041
1042    #[test]
1043    #[should_panic(expected = "blah")]
1044    fn test_deterministic_panic_aborts_spawn() {
1045        let executor = deterministic::Runner::default();
1046        test_panic_aborts_spawn(executor);
1047    }
1048
1049    #[test]
1050    fn test_deterministic_select() {
1051        let executor = deterministic::Runner::default();
1052        test_select(executor);
1053    }
1054
1055    #[test]
1056    fn test_deterministic_select_loop() {
1057        let executor = deterministic::Runner::default();
1058        test_select_loop(executor);
1059    }
1060
1061    #[test]
1062    fn test_deterministic_storage_operations() {
1063        let executor = deterministic::Runner::default();
1064        test_storage_operations(executor);
1065    }
1066
1067    #[test]
1068    fn test_deterministic_blob_read_write() {
1069        let executor = deterministic::Runner::default();
1070        test_blob_read_write(executor);
1071    }
1072
1073    #[test]
1074    fn test_deterministic_many_partition_read_write() {
1075        let executor = deterministic::Runner::default();
1076        test_many_partition_read_write(executor);
1077    }
1078
1079    #[test]
1080    fn test_deterministic_blob_read_past_length() {
1081        let executor = deterministic::Runner::default();
1082        test_blob_read_past_length(executor);
1083    }
1084
1085    #[test]
1086    fn test_deterministic_blob_clone_and_concurrent_read() {
1087        // Run test
1088        let executor = deterministic::Runner::default();
1089        test_blob_clone_and_concurrent_read(executor);
1090    }
1091
1092    #[test]
1093    fn test_deterministic_shutdown() {
1094        let executor = deterministic::Runner::default();
1095        test_shutdown(executor);
1096    }
1097
1098    #[test]
1099    fn test_deterministic_spawn_ref() {
1100        let executor = deterministic::Runner::default();
1101        test_spawn_ref(executor);
1102    }
1103
1104    #[test]
1105    #[should_panic]
1106    fn test_deterministic_spawn_ref_duplicate() {
1107        let executor = deterministic::Runner::default();
1108        test_spawn_ref_duplicate(executor);
1109    }
1110
1111    #[test]
1112    #[should_panic]
1113    fn test_deterministic_spawn_duplicate() {
1114        let executor = deterministic::Runner::default();
1115        test_spawn_duplicate(executor);
1116    }
1117
1118    #[test]
1119    fn test_deterministic_spawn_blocking() {
1120        let executor = deterministic::Runner::default();
1121        test_spawn_blocking(executor);
1122    }
1123
1124    #[test]
1125    #[should_panic(expected = "blocking task panicked")]
1126    fn test_deterministic_spawn_blocking_panic() {
1127        let executor = deterministic::Runner::default();
1128        executor.start(|context| async move {
1129            let handle = context.spawn_blocking(|| {
1130                panic!("blocking task panicked");
1131            });
1132            handle.await.unwrap();
1133        });
1134    }
1135
1136    #[test]
1137    fn test_deterministic_spawn_blocking_abort() {
1138        let executor = deterministic::Runner::default();
1139        test_spawn_blocking_abort(executor);
1140    }
1141
1142    #[test]
1143    fn test_deterministic_metrics() {
1144        let executor = deterministic::Runner::default();
1145        test_metrics(executor);
1146    }
1147
1148    #[test]
1149    #[should_panic]
1150    fn test_deterministic_metrics_label() {
1151        let executor = deterministic::Runner::default();
1152        test_metrics_label(executor);
1153    }
1154
1155    #[test]
1156    fn test_tokio_error_future() {
1157        let runner = tokio::Runner::default();
1158        test_error_future(runner);
1159    }
1160
1161    #[test]
1162    fn test_tokio_clock_sleep() {
1163        let executor = tokio::Runner::default();
1164        test_clock_sleep(executor);
1165    }
1166
1167    #[test]
1168    fn test_tokio_clock_sleep_until() {
1169        let executor = tokio::Runner::default();
1170        test_clock_sleep_until(executor);
1171    }
1172
1173    #[test]
1174    fn test_tokio_root_finishes() {
1175        let executor = tokio::Runner::default();
1176        test_root_finishes(executor);
1177    }
1178
1179    #[test]
1180    fn test_tokio_spawn_abort() {
1181        let executor = tokio::Runner::default();
1182        test_spawn_abort(executor);
1183    }
1184
1185    #[test]
1186    fn test_tokio_panic_aborts_root() {
1187        let executor = tokio::Runner::default();
1188        test_panic_aborts_root(executor);
1189    }
1190
1191    #[test]
1192    fn test_tokio_panic_aborts_spawn() {
1193        let executor = tokio::Runner::default();
1194        test_panic_aborts_spawn(executor);
1195    }
1196
1197    #[test]
1198    fn test_tokio_select() {
1199        let executor = tokio::Runner::default();
1200        test_select(executor);
1201    }
1202
1203    #[test]
1204    fn test_tokio_select_loop() {
1205        let executor = tokio::Runner::default();
1206        test_select_loop(executor);
1207    }
1208
1209    #[test]
1210    fn test_tokio_storage_operations() {
1211        let executor = tokio::Runner::default();
1212        test_storage_operations(executor);
1213    }
1214
1215    #[test]
1216    fn test_tokio_blob_read_write() {
1217        let executor = tokio::Runner::default();
1218        test_blob_read_write(executor);
1219    }
1220
1221    #[test]
1222    fn test_tokio_many_partition_read_write() {
1223        let executor = tokio::Runner::default();
1224        test_many_partition_read_write(executor);
1225    }
1226
1227    #[test]
1228    fn test_tokio_blob_read_past_length() {
1229        let executor = tokio::Runner::default();
1230        test_blob_read_past_length(executor);
1231    }
1232
1233    #[test]
1234    fn test_tokio_blob_clone_and_concurrent_read() {
1235        // Run test
1236        let executor = tokio::Runner::default();
1237        test_blob_clone_and_concurrent_read(executor);
1238    }
1239
1240    #[test]
1241    fn test_tokio_shutdown() {
1242        let executor = tokio::Runner::default();
1243        test_shutdown(executor);
1244    }
1245
1246    #[test]
1247    fn test_tokio_spawn_ref() {
1248        let executor = tokio::Runner::default();
1249        test_spawn_ref(executor);
1250    }
1251
1252    #[test]
1253    #[should_panic]
1254    fn test_tokio_spawn_ref_duplicate() {
1255        let executor = tokio::Runner::default();
1256        test_spawn_ref_duplicate(executor);
1257    }
1258
1259    #[test]
1260    #[should_panic]
1261    fn test_tokio_spawn_duplicate() {
1262        let executor = tokio::Runner::default();
1263        test_spawn_duplicate(executor);
1264    }
1265
1266    #[test]
1267    fn test_tokio_spawn_blocking() {
1268        let executor = tokio::Runner::default();
1269        test_spawn_blocking(executor);
1270    }
1271
1272    #[test]
1273    fn test_tokio_spawn_blocking_panic() {
1274        let executor = tokio::Runner::default();
1275        executor.start(|context| async move {
1276            let handle = context.spawn_blocking(|| {
1277                panic!("blocking task panicked");
1278            });
1279            let result = handle.await;
1280            assert!(matches!(result, Err(Error::Exited)));
1281        });
1282    }
1283
1284    #[test]
1285    fn test_tokio_spawn_blocking_abort() {
1286        let executor = tokio::Runner::default();
1287        test_spawn_blocking_abort(executor);
1288    }
1289
1290    #[test]
1291    fn test_tokio_metrics() {
1292        let executor = tokio::Runner::default();
1293        test_metrics(executor);
1294    }
1295
1296    #[test]
1297    #[should_panic]
1298    fn test_tokio_metrics_label() {
1299        let executor = tokio::Runner::default();
1300        test_metrics_label(executor);
1301    }
1302
1303    #[test]
1304    fn test_tokio_telemetry() {
1305        let executor = tokio::Runner::default();
1306        executor.start(|context| async move {
1307            // Define the server address
1308            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1309
1310            // Configure telemetry
1311            tokio::telemetry::init(
1312                context.with_label("metrics"),
1313                tokio::telemetry::Logging {
1314                    level: Level::INFO,
1315                    json: false,
1316                },
1317                Some(address),
1318                None,
1319            );
1320
1321            // Register a test metric
1322            let counter: Counter<u64> = Counter::default();
1323            context.register("test_counter", "Test counter", counter.clone());
1324            counter.inc();
1325
1326            // Helper functions to parse HTTP response
1327            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1328                let mut line = Vec::new();
1329                loop {
1330                    let mut byte = [0; 1];
1331                    stream.recv(&mut byte).await?;
1332                    if byte[0] == b'\n' {
1333                        if line.last() == Some(&b'\r') {
1334                            line.pop(); // Remove trailing \r
1335                        }
1336                        break;
1337                    }
1338                    line.push(byte[0]);
1339                }
1340                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1341            }
1342
1343            async fn read_headers<St: Stream>(
1344                stream: &mut St,
1345            ) -> Result<HashMap<String, String>, Error> {
1346                let mut headers = HashMap::new();
1347                loop {
1348                    let line = read_line(stream).await?;
1349                    if line.is_empty() {
1350                        break;
1351                    }
1352                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1353                    if parts.len() == 2 {
1354                        headers.insert(parts[0].to_string(), parts[1].to_string());
1355                    }
1356                }
1357                Ok(headers)
1358            }
1359
1360            async fn read_body<St: Stream>(
1361                stream: &mut St,
1362                content_length: usize,
1363            ) -> Result<String, Error> {
1364                let mut body = vec![0; content_length];
1365                stream.recv(&mut body).await?;
1366                String::from_utf8(body).map_err(|_| Error::ReadFailed)
1367            }
1368
1369            // Simulate a client connecting to the server
1370            let client_handle = context
1371                .with_label("client")
1372                .spawn(move |context| async move {
1373                    let (mut sink, mut stream) = loop {
1374                        match context.dial(address).await {
1375                            Ok((sink, stream)) => break (sink, stream),
1376                            Err(e) => {
1377                                // The client may be polled before the server is ready, that's alright!
1378                                error!(err =?e, "failed to connect");
1379                                context.sleep(Duration::from_millis(10)).await;
1380                            }
1381                        }
1382                    };
1383
1384                    // Send a GET request to the server
1385                    let request = format!(
1386                        "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1387                        address
1388                    );
1389                    sink.send(request.as_bytes()).await.unwrap();
1390
1391                    // Read and verify the HTTP status line
1392                    let status_line = read_line(&mut stream).await.unwrap();
1393                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1394
1395                    // Read and parse headers
1396                    let headers = read_headers(&mut stream).await.unwrap();
1397                    println!("Headers: {:?}", headers);
1398                    let content_length = headers
1399                        .get("content-length")
1400                        .unwrap()
1401                        .parse::<usize>()
1402                        .unwrap();
1403
1404                    // Read and verify the body
1405                    let body = read_body(&mut stream, content_length).await.unwrap();
1406                    assert!(body.contains("test_counter_total 1"));
1407                });
1408
1409            // Wait for the client task to complete
1410            client_handle.await.unwrap();
1411        });
1412    }
1413}