commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::{
22    future::Future,
23    net::SocketAddr,
24    time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31    if #[cfg(not(target_arch = "wasm32"))] {
32        pub mod tokio;
33    }
34}
35pub mod telemetry;
36mod utils;
37pub use utils::{reschedule, Handle, Signal, Signaler};
38
39/// Prefix for runtime metrics.
40const METRICS_PREFIX: &str = "runtime";
41
42/// Errors that can occur when interacting with the runtime.
43#[derive(Error, Debug, PartialEq)]
44pub enum Error {
45    #[error("exited")]
46    Exited,
47    #[error("closed")]
48    Closed,
49    #[error("timeout")]
50    Timeout,
51    #[error("bind failed")]
52    BindFailed,
53    #[error("connection failed")]
54    ConnectionFailed,
55    #[error("write failed")]
56    WriteFailed,
57    #[error("read failed")]
58    ReadFailed,
59    #[error("send failed")]
60    SendFailed,
61    #[error("recv failed")]
62    RecvFailed,
63    #[error("partition creation failed: {0}")]
64    PartitionCreationFailed(String),
65    #[error("partition missing: {0}")]
66    PartitionMissing(String),
67    #[error("partition corrupt: {0}")]
68    PartitionCorrupt(String),
69    #[error("blob open failed: {0}/{1}")]
70    BlobOpenFailed(String, String),
71    #[error("blob missing: {0}/{1}")]
72    BlobMissing(String, String),
73    #[error("blob truncate failed: {0}/{1}")]
74    BlobTruncateFailed(String, String),
75    #[error("blob sync failed: {0}/{1}")]
76    BlobSyncFailed(String, String),
77    #[error("blob close failed: {0}/{1}")]
78    BlobCloseFailed(String, String),
79    #[error("blob insufficient length")]
80    BlobInsufficientLength,
81    #[error("offset overflow")]
82    OffsetOverflow,
83}
84
85/// Interface that any task scheduler must implement to start
86/// running tasks.
87pub trait Runner {
88    /// Start running a root task.
89    ///
90    /// The root task does not create the initial context because it can be useful to have a reference
91    /// to context before starting task execution.
92    fn start<F>(self, f: F) -> F::Output
93    where
94        F: Future + Send + 'static,
95        F::Output: Send + 'static;
96}
97
98/// Interface that any task scheduler must implement to spawn tasks.
99pub trait Spawner: Clone + Send + Sync + 'static {
100    /// Enqueue a task to be executed.
101    ///
102    /// Unlike a future, a spawned task will start executing immediately (even if the caller
103    /// does not await the handle).
104    ///
105    /// Spawned tasks consume the context used to create them. This ensures that context cannot
106    /// be shared between tasks and that a task's context always comes from somewhere.
107    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
108    where
109        F: FnOnce(Self) -> Fut + Send + 'static,
110        Fut: Future<Output = T> + Send + 'static,
111        T: Send + 'static;
112
113    /// Enqueue a task to be executed (without consuming the context).
114    ///
115    /// Unlike a future, a spawned task will start executing immediately (even if the caller
116    /// does not await the handle).
117    ///
118    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
119    /// an actor that already has a reference to context).
120    ///
121    /// # Warning
122    ///
123    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
124    /// to prevent accidental misuse.
125    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
126    where
127        F: Future<Output = T> + Send + 'static,
128        T: Send + 'static;
129
130    /// Signals the runtime to stop execution and that all outstanding tasks
131    /// should perform any required cleanup and exit. This method is idempotent and
132    /// can be called multiple times.
133    ///
134    /// This method does not actually kill any tasks but rather signals to them, using
135    /// the `Signal` returned by `stopped`, that they should exit.
136    fn stop(&self, value: i32);
137
138    /// Returns an instance of a `Signal` that resolves when `stop` is called by
139    /// any task.
140    ///
141    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
142    fn stopped(&self) -> Signal;
143}
144
145/// Interface to register and encode metrics.
146pub trait Metrics: Clone + Send + Sync + 'static {
147    /// Create a new instance of `Metrics` with the given label appended to the end
148    /// of the current `Metrics` label.
149    ///
150    /// This is commonly used to create a nested context for `register`.
151    ///
152    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
153    /// label (reserved for metrics for the runtime).
154    fn with_label(&self, label: &str) -> Self;
155
156    /// Get the current label of the context.
157    fn label(&self) -> String;
158
159    /// Register a metric with the runtime.
160    ///
161    /// Any registered metric will include (as a prefix) the label of the current context.
162    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
163
164    /// Encode all metrics into a buffer.
165    fn encode(&self) -> String;
166}
167
168/// Interface that any task scheduler must implement to provide
169/// time-based operations.
170///
171/// It is necessary to mock time to provide deterministic execution
172/// of arbitrary tasks.
173pub trait Clock: Clone + Send + Sync + 'static {
174    /// Returns the current time.
175    fn current(&self) -> SystemTime;
176
177    /// Sleep for the given duration.
178    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
179
180    /// Sleep until the given deadline.
181    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
182}
183
184/// Interface that any runtime must implement to create
185/// network connections.
186pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
187where
188    L: Listener<Si, St>,
189    Si: Sink,
190    St: Stream,
191{
192    /// Bind to the given socket address.
193    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
194
195    /// Dial the given socket address.
196    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
197}
198
199/// Interface that any runtime must implement to handle
200/// incoming network connections.
201pub trait Listener<Si, St>: Sync + Send + 'static
202where
203    Si: Sink,
204    St: Stream,
205{
206    /// Accept an incoming connection.
207    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
208}
209
210/// Interface that any runtime must implement to send
211/// messages over a network connection.
212pub trait Sink: Sync + Send + 'static {
213    /// Send a message to the sink.
214    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
215}
216
217/// Interface that any runtime must implement to receive
218/// messages over a network connection.
219pub trait Stream: Sync + Send + 'static {
220    /// Receive a message from the stream, storing it in the given buffer.
221    /// Reads exactly the number of bytes that fit in the buffer.
222    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
223}
224
225/// Interface to interact with storage.
226///
227///
228/// To support storage implementations that enable concurrent reads and
229/// writes, blobs are responsible for maintaining synchronization.
230///
231/// Storage can be backed by a local filesystem, cloud storage, etc.
232pub trait Storage<B>: Clone + Send + Sync + 'static
233where
234    B: Blob,
235{
236    /// Open an existing blob in a given partition or create a new one.
237    ///
238    /// Multiple instances of the same blob can be opened concurrently, however,
239    /// writing to the same blob concurrently may lead to undefined behavior.
240    fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
241
242    /// Remove a blob from a given partition.
243    ///
244    /// If no `name` is provided, the entire partition is removed.
245    fn remove(
246        &self,
247        partition: &str,
248        name: Option<&[u8]>,
249    ) -> impl Future<Output = Result<(), Error>> + Send;
250
251    /// Return all blobs in a given partition.
252    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
253}
254
255/// Interface to read and write to a blob.
256///
257/// To support blob implementations that enable concurrent reads and
258/// writes, blobs are responsible for maintaining synchronization.
259///
260/// Cloning a blob is similar to wrapping a single file descriptor in
261/// a lock whereas opening a new blob (of the same name) is similar to
262/// opening a new file descriptor. If multiple blobs are opened with the same
263/// name, they are not expected to coordinate access to underlying storage
264/// and writing to both is undefined behavior.
265#[allow(clippy::len_without_is_empty)]
266pub trait Blob: Clone + Send + Sync + 'static {
267    /// Get the length of the blob.
268    fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
269
270    /// Read from the blob at the given offset.
271    ///
272    /// `read_at` does not return the number of bytes read because it
273    /// only returns once the entire buffer has been filled.
274    fn read_at(
275        &self,
276        buf: &mut [u8],
277        offset: u64,
278    ) -> impl Future<Output = Result<(), Error>> + Send;
279
280    /// Write to the blob at the given offset.
281    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
282
283    /// Truncate the blob to the given length.
284    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
285
286    /// Ensure all pending data is durably persisted.
287    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
288
289    /// Close the blob.
290    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use commonware_macros::select;
297    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
298    use prometheus_client::metrics::counter::Counter;
299    use std::panic::{catch_unwind, AssertUnwindSafe};
300    use std::sync::Mutex;
301    use utils::reschedule;
302
303    fn test_error_future(runner: impl Runner) {
304        async fn error_future() -> Result<&'static str, &'static str> {
305            Err("An error occurred")
306        }
307        let result = runner.start(error_future());
308        assert_eq!(result, Err("An error occurred"));
309    }
310
311    fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
312        runner.start(async move {
313            // Capture initial time
314            let start = context.current();
315            let sleep_duration = Duration::from_millis(10);
316            context.sleep(sleep_duration).await;
317
318            // After run, time should have advanced
319            let end = context.current();
320            assert!(end.duration_since(start).unwrap() >= sleep_duration);
321        });
322    }
323
324    fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
325        runner.start(async move {
326            // Trigger sleep
327            let now = context.current();
328            context.sleep_until(now + Duration::from_millis(100)).await;
329
330            // Ensure slept duration has elapsed
331            let elapsed = now.elapsed().unwrap();
332            assert!(elapsed >= Duration::from_millis(100));
333        });
334    }
335
336    fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
337        runner.start(async move {
338            context.spawn(|_| async move {
339                loop {
340                    reschedule().await;
341                }
342            });
343        });
344    }
345
346    fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
347        runner.start(async move {
348            let handle = context.spawn(|_| async move {
349                loop {
350                    reschedule().await;
351                }
352            });
353            handle.abort();
354            assert_eq!(handle.await, Err(Error::Closed));
355        });
356    }
357
358    fn test_panic_aborts_root(runner: impl Runner) {
359        let result = catch_unwind(AssertUnwindSafe(|| {
360            runner.start(async move {
361                panic!("blah");
362            });
363        }));
364        result.unwrap_err();
365    }
366
367    fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
368        let result = runner.start(async move {
369            let result = context.spawn(|_| async move {
370                panic!("blah");
371            });
372            assert_eq!(result.await, Err(Error::Exited));
373            Result::<(), Error>::Ok(())
374        });
375
376        // Ensure panic was caught
377        result.unwrap();
378    }
379
380    fn test_select(runner: impl Runner) {
381        runner.start(async move {
382            // Test first branch
383            let output = Mutex::new(0);
384            select! {
385                v1 = ready(1) => {
386                    *output.lock().unwrap() = v1;
387                },
388                v2 = ready(2) => {
389                    *output.lock().unwrap() = v2;
390                },
391            };
392            assert_eq!(*output.lock().unwrap(), 1);
393
394            // Test second branch
395            select! {
396                v1 = std::future::pending::<i32>() => {
397                    *output.lock().unwrap() = v1;
398                },
399                v2 = ready(2) => {
400                    *output.lock().unwrap() = v2;
401                },
402            };
403            assert_eq!(*output.lock().unwrap(), 2);
404        });
405    }
406
407    /// Ensure future fusing works as expected.
408    fn test_select_loop(runner: impl Runner, context: impl Clock) {
409        runner.start(async move {
410            // Should hit timeout
411            let (mut sender, mut receiver) = mpsc::unbounded();
412            for _ in 0..2 {
413                select! {
414                    v = receiver.next() => {
415                        panic!("unexpected value: {:?}", v);
416                    },
417                    _ = context.sleep(Duration::from_millis(100)) => {
418                        continue;
419                    },
420                };
421            }
422
423            // Populate channel
424            sender.send(0).await.unwrap();
425            sender.send(1).await.unwrap();
426
427            // Prefer not reading channel without losing messages
428            select! {
429                _ = async {} => {
430                    // Skip reading from channel even though populated
431                },
432                v = receiver.next() => {
433                    panic!("unexpected value: {:?}", v);
434                },
435            };
436
437            // Process messages
438            for i in 0..2 {
439                select! {
440                    _ = context.sleep(Duration::from_millis(100)) => {
441                        panic!("timeout");
442                    },
443                    v = receiver.next() => {
444                        assert_eq!(v.unwrap(), i);
445                    },
446                };
447            }
448        });
449    }
450
451    fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
452    where
453        B: Blob,
454    {
455        runner.start(async move {
456            let partition = "test_partition";
457            let name = b"test_blob";
458
459            // Open a new blob
460            let blob = context
461                .open(partition, name)
462                .await
463                .expect("Failed to open blob");
464
465            // Write data to the blob
466            let data = b"Hello, Storage!";
467            blob.write_at(data, 0)
468                .await
469                .expect("Failed to write to blob");
470
471            // Sync the blob
472            blob.sync().await.expect("Failed to sync blob");
473
474            // Read data from the blob
475            let mut buffer = vec![0u8; data.len()];
476            blob.read_at(&mut buffer, 0)
477                .await
478                .expect("Failed to read from blob");
479            assert_eq!(&buffer, data);
480
481            // Get blob length
482            let length = blob.len().await.expect("Failed to get blob length");
483            assert_eq!(length, data.len() as u64);
484
485            // Close the blob
486            blob.close().await.expect("Failed to close blob");
487
488            // Scan blobs in the partition
489            let blobs = context
490                .scan(partition)
491                .await
492                .expect("Failed to scan partition");
493            assert!(blobs.contains(&name.to_vec()));
494
495            // Reopen the blob
496            let blob = context
497                .open(partition, name)
498                .await
499                .expect("Failed to reopen blob");
500
501            // Read data part of message back
502            let mut buffer = vec![0u8; 7];
503            blob.read_at(&mut buffer, 7)
504                .await
505                .expect("Failed to read data");
506            assert_eq!(&buffer, b"Storage");
507
508            // Close the blob
509            blob.close().await.expect("Failed to close blob");
510
511            // Remove the blob
512            context
513                .remove(partition, Some(name))
514                .await
515                .expect("Failed to remove blob");
516
517            // Ensure the blob is removed
518            let blobs = context
519                .scan(partition)
520                .await
521                .expect("Failed to scan partition");
522            assert!(!blobs.contains(&name.to_vec()));
523
524            // Remove the partition
525            context
526                .remove(partition, None)
527                .await
528                .expect("Failed to remove partition");
529
530            // Scan the partition
531            let result = context.scan(partition).await;
532            assert!(matches!(result, Err(Error::PartitionMissing(_))));
533        });
534    }
535
536    fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
537    where
538        B: Blob,
539    {
540        runner.start(async move {
541            let partition = "test_partition";
542            let name = b"test_blob_rw";
543
544            // Open a new blob
545            let blob = context
546                .open(partition, name)
547                .await
548                .expect("Failed to open blob");
549
550            // Write data at different offsets
551            let data1 = b"Hello";
552            let data2 = b"World";
553            blob.write_at(data1, 0)
554                .await
555                .expect("Failed to write data1");
556            blob.write_at(data2, 5)
557                .await
558                .expect("Failed to write data2");
559
560            // Assert that length tracks pending data
561            let length = blob.len().await.expect("Failed to get blob length");
562            assert_eq!(length, 10);
563
564            // Read data back
565            let mut buffer = vec![0u8; 10];
566            blob.read_at(&mut buffer, 0)
567                .await
568                .expect("Failed to read data");
569            assert_eq!(&buffer[..5], data1);
570            assert_eq!(&buffer[5..], data2);
571
572            // Rewrite data without affecting length
573            let data3 = b"Store";
574            blob.write_at(data3, 5)
575                .await
576                .expect("Failed to write data3");
577            let length = blob.len().await.expect("Failed to get blob length");
578            assert_eq!(length, 10);
579
580            // Truncate the blob
581            blob.truncate(5).await.expect("Failed to truncate blob");
582            let length = blob.len().await.expect("Failed to get blob length");
583            assert_eq!(length, 5);
584            let mut buffer = vec![0u8; 5];
585            blob.read_at(&mut buffer, 0)
586                .await
587                .expect("Failed to read data");
588            assert_eq!(&buffer[..5], data1);
589
590            // Full read after truncation
591            let mut buffer = vec![0u8; 10];
592            let result = blob.read_at(&mut buffer, 0).await;
593            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
594
595            // Close the blob
596            blob.close().await.expect("Failed to close blob");
597        });
598    }
599
600    fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
601    where
602        B: Blob,
603    {
604        runner.start(async move {
605            let partitions = ["partition1", "partition2", "partition3"];
606            let name = b"test_blob_rw";
607
608            for (additional, partition) in partitions.iter().enumerate() {
609                // Open a new blob
610                let blob = context
611                    .open(partition, name)
612                    .await
613                    .expect("Failed to open blob");
614
615                // Write data at different offsets
616                let data1 = b"Hello";
617                let data2 = b"World";
618                blob.write_at(data1, 0)
619                    .await
620                    .expect("Failed to write data1");
621                blob.write_at(data2, 5 + additional as u64)
622                    .await
623                    .expect("Failed to write data2");
624
625                // Close the blob
626                blob.close().await.expect("Failed to close blob");
627            }
628
629            for (additional, partition) in partitions.iter().enumerate() {
630                // Open a new blob
631                let blob = context
632                    .open(partition, name)
633                    .await
634                    .expect("Failed to open blob");
635
636                // Read data back
637                let mut buffer = vec![0u8; 10 + additional];
638                blob.read_at(&mut buffer, 0)
639                    .await
640                    .expect("Failed to read data");
641                assert_eq!(&buffer[..5], b"Hello");
642                assert_eq!(&buffer[5 + additional..], b"World");
643
644                // Close the blob
645                blob.close().await.expect("Failed to close blob");
646            }
647        });
648    }
649
650    fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
651    where
652        B: Blob,
653    {
654        runner.start(async move {
655            let partition = "test_partition";
656            let name = b"test_blob_rw";
657
658            // Open a new blob
659            let blob = context
660                .open(partition, name)
661                .await
662                .expect("Failed to open blob");
663
664            // Read data past file length (empty file)
665            let mut buffer = vec![0u8; 10];
666            let result = blob.read_at(&mut buffer, 0).await;
667            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
668
669            // Write data to the blob
670            let data = b"Hello, Storage!";
671            blob.write_at(data, 0)
672                .await
673                .expect("Failed to write to blob");
674
675            // Read data past file length (non-empty file)
676            let mut buffer = vec![0u8; 20];
677            let result = blob.read_at(&mut buffer, 0).await;
678            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
679        })
680    }
681
682    fn test_blob_clone_and_concurrent_read<B>(
683        runner: impl Runner,
684        context: impl Spawner + Storage<B> + Metrics,
685    ) where
686        B: Blob,
687    {
688        runner.start(async move {
689            let partition = "test_partition";
690            let name = b"test_blob_rw";
691
692            // Open a new blob
693            let blob = context
694                .open(partition, name)
695                .await
696                .expect("Failed to open blob");
697
698            // Write data to the blob
699            let data = b"Hello, Storage!";
700            blob.write_at(data, 0)
701                .await
702                .expect("Failed to write to blob");
703
704            // Sync the blob
705            blob.sync().await.expect("Failed to sync blob");
706
707            // Read data from the blob in clone
708            let check1 = context.with_label("check1").spawn({
709                let blob = blob.clone();
710                move |_| async move {
711                    let mut buffer = vec![0u8; data.len()];
712                    blob.read_at(&mut buffer, 0)
713                        .await
714                        .expect("Failed to read from blob");
715                    assert_eq!(&buffer, data);
716                }
717            });
718            let check2 = context.with_label("check2").spawn({
719                let blob = blob.clone();
720                move |_| async move {
721                    let mut buffer = vec![0u8; data.len()];
722                    blob.read_at(&mut buffer, 0)
723                        .await
724                        .expect("Failed to read from blob");
725                    assert_eq!(&buffer, data);
726                }
727            });
728
729            // Wait for both reads to complete
730            let result = join!(check1, check2);
731            assert!(result.0.is_ok());
732            assert!(result.1.is_ok());
733
734            // Read data from the blob
735            let mut buffer = vec![0u8; data.len()];
736            blob.read_at(&mut buffer, 0)
737                .await
738                .expect("Failed to read from blob");
739            assert_eq!(&buffer, data);
740
741            // Get blob length
742            let length = blob.len().await.expect("Failed to get blob length");
743            assert_eq!(length, data.len() as u64);
744
745            // Close the blob
746            blob.close().await.expect("Failed to close blob");
747        });
748    }
749
750    fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
751        let kill = 9;
752        runner.start(async move {
753            // Spawn a task that waits for signal
754            let before = context
755                .with_label("before")
756                .spawn(move |context| async move {
757                    let sig = context.stopped().await;
758                    assert_eq!(sig.unwrap(), kill);
759                });
760
761            // Spawn a task after stop is called
762            let after = context
763                .with_label("after")
764                .spawn(move |context| async move {
765                    // Wait for stop signal
766                    let mut signal = context.stopped();
767                    loop {
768                        select! {
769                            sig = &mut signal => {
770                                // Stopper resolved
771                                assert_eq!(sig.unwrap(), kill);
772                                break;
773                            },
774                            _ = context.sleep(Duration::from_millis(10)) => {
775                                // Continue waiting
776                            },
777                        }
778                    }
779                });
780
781            // Sleep for a bit before stopping
782            context.sleep(Duration::from_millis(50)).await;
783
784            // Signal the task
785            context.stop(kill);
786
787            // Ensure both tasks complete
788            let result = join!(before, after);
789            assert!(result.0.is_ok());
790            assert!(result.1.is_ok());
791        });
792    }
793
794    fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
795        runner.start(async move {
796            let handle = context.spawn_ref();
797            let result = handle(async move { 42 }).await;
798            assert_eq!(result, Ok(42));
799        });
800    }
801
802    fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
803        runner.start(async move {
804            let handle = context.spawn_ref();
805            let result = handle(async move { 42 }).await;
806            assert_eq!(result, Ok(42));
807
808            // Ensure context is consumed
809            let handle = context.spawn_ref();
810            let result = handle(async move { 42 }).await;
811            assert_eq!(result, Ok(42));
812        });
813    }
814
815    fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
816        runner.start(async move {
817            let handle = context.spawn_ref();
818            let result = handle(async move { 42 }).await;
819            assert_eq!(result, Ok(42));
820
821            // Ensure context is consumed
822            context.spawn(|_| async move { 42 });
823        });
824    }
825
826    fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
827        runner.start(async move {
828            // Assert label
829            assert_eq!(context.label(), "");
830
831            // Register a metric
832            let counter = Counter::<u64>::default();
833            context.register("test", "test", counter.clone());
834
835            // Increment the counter
836            counter.inc();
837
838            // Encode metrics
839            let buffer = context.encode();
840            assert!(buffer.contains("test_total 1"));
841
842            // Nested context
843            let context = context.with_label("nested");
844            let nested_counter = Counter::<u64>::default();
845            context.register("test", "test", nested_counter.clone());
846
847            // Increment the counter
848            nested_counter.inc();
849
850            // Encode metrics
851            let buffer = context.encode();
852            assert!(buffer.contains("nested_test_total 1"));
853            assert!(buffer.contains("test_total 1"));
854        });
855    }
856
857    fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
858        runner.start(async move {
859            context.with_label(METRICS_PREFIX);
860        })
861    }
862
863    #[test]
864    fn test_deterministic_future() {
865        let (runner, _, _) = deterministic::Executor::default();
866        test_error_future(runner);
867    }
868
869    #[test]
870    fn test_deterministic_clock_sleep() {
871        let (executor, context, _) = deterministic::Executor::default();
872        assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
873        test_clock_sleep(executor, context);
874    }
875
876    #[test]
877    fn test_deterministic_clock_sleep_until() {
878        let (executor, context, _) = deterministic::Executor::default();
879        test_clock_sleep_until(executor, context);
880    }
881
882    #[test]
883    fn test_deterministic_root_finishes() {
884        let (executor, context, _) = deterministic::Executor::default();
885        test_root_finishes(executor, context);
886    }
887
888    #[test]
889    fn test_deterministic_spawn_abort() {
890        let (executor, context, _) = deterministic::Executor::default();
891        test_spawn_abort(executor, context);
892    }
893
894    #[test]
895    fn test_deterministic_panic_aborts_root() {
896        let (runner, _, _) = deterministic::Executor::default();
897        test_panic_aborts_root(runner);
898    }
899
900    #[test]
901    #[should_panic(expected = "blah")]
902    fn test_deterministic_panic_aborts_spawn() {
903        let (executor, context, _) = deterministic::Executor::default();
904        test_panic_aborts_spawn(executor, context);
905    }
906
907    #[test]
908    fn test_deterministic_select() {
909        let (executor, _, _) = deterministic::Executor::default();
910        test_select(executor);
911    }
912
913    #[test]
914    fn test_deterministic_select_loop() {
915        let (executor, context, _) = deterministic::Executor::default();
916        test_select_loop(executor, context);
917    }
918
919    #[test]
920    fn test_deterministic_storage_operations() {
921        let (executor, context, _) = deterministic::Executor::default();
922        test_storage_operations(executor, context);
923    }
924
925    #[test]
926    fn test_deterministic_blob_read_write() {
927        let (executor, context, _) = deterministic::Executor::default();
928        test_blob_read_write(executor, context);
929    }
930
931    #[test]
932    fn test_deterministic_many_partition_read_write() {
933        let (executor, context, _) = deterministic::Executor::default();
934        test_many_partition_read_write(executor, context);
935    }
936
937    #[test]
938    fn test_deterministic_blob_read_past_length() {
939        let (executor, context, _) = deterministic::Executor::default();
940        test_blob_read_past_length(executor, context);
941    }
942
943    #[test]
944    fn test_deterministic_blob_clone_and_concurrent_read() {
945        // Run test
946        let (executor, context, _) = deterministic::Executor::default();
947        test_blob_clone_and_concurrent_read(executor, context.clone());
948
949        // Ensure no blobs still open
950        let buffer = context.encode();
951        assert!(buffer.contains("open_blobs 0"));
952    }
953
954    #[test]
955    fn test_deterministic_shutdown() {
956        let (executor, context, _) = deterministic::Executor::default();
957        test_shutdown(executor, context);
958    }
959
960    #[test]
961    fn test_deterministic_spawn_ref() {
962        let (executor, context, _) = deterministic::Executor::default();
963        test_spawn_ref(executor, context);
964    }
965
966    #[test]
967    #[should_panic]
968    fn test_deterministic_spawn_ref_duplicate() {
969        let (executor, context, _) = deterministic::Executor::default();
970        test_spawn_ref_duplicate(executor, context);
971    }
972
973    #[test]
974    #[should_panic]
975    fn test_deterministic_spawn_duplicate() {
976        let (executor, context, _) = deterministic::Executor::default();
977        test_spawn_duplicate(executor, context);
978    }
979
980    #[test]
981    fn test_deterministic_metrics() {
982        let (executor, context, _) = deterministic::Executor::default();
983        test_metrics(executor, context);
984    }
985
986    #[test]
987    #[should_panic]
988    fn test_deterministic_metrics_label() {
989        let (executor, context, _) = deterministic::Executor::default();
990        test_metrics_label(executor, context);
991    }
992
993    #[test]
994    fn test_tokio_error_future() {
995        let (runner, _) = tokio::Executor::default();
996        test_error_future(runner);
997    }
998
999    #[test]
1000    fn test_tokio_clock_sleep() {
1001        let (executor, context) = tokio::Executor::default();
1002        test_clock_sleep(executor, context);
1003    }
1004
1005    #[test]
1006    fn test_tokio_clock_sleep_until() {
1007        let (executor, context) = tokio::Executor::default();
1008        test_clock_sleep_until(executor, context);
1009    }
1010
1011    #[test]
1012    fn test_tokio_root_finishes() {
1013        let (executor, context) = tokio::Executor::default();
1014        test_root_finishes(executor, context);
1015    }
1016
1017    #[test]
1018    fn test_tokio_spawn_abort() {
1019        let (executor, context) = tokio::Executor::default();
1020        test_spawn_abort(executor, context);
1021    }
1022
1023    #[test]
1024    fn test_tokio_panic_aborts_root() {
1025        let (runner, _) = tokio::Executor::default();
1026        test_panic_aborts_root(runner);
1027    }
1028
1029    #[test]
1030    fn test_tokio_panic_aborts_spawn() {
1031        let (executor, context) = tokio::Executor::default();
1032        test_panic_aborts_spawn(executor, context);
1033    }
1034
1035    #[test]
1036    fn test_tokio_select() {
1037        let (executor, _) = tokio::Executor::default();
1038        test_select(executor);
1039    }
1040
1041    #[test]
1042    fn test_tokio_select_loop() {
1043        let (executor, context) = tokio::Executor::default();
1044        test_select_loop(executor, context);
1045    }
1046
1047    #[test]
1048    fn test_tokio_storage_operations() {
1049        let (executor, context) = tokio::Executor::default();
1050        test_storage_operations(executor, context);
1051    }
1052
1053    #[test]
1054    fn test_tokio_blob_read_write() {
1055        let (executor, context) = tokio::Executor::default();
1056        test_blob_read_write(executor, context);
1057    }
1058
1059    #[test]
1060    fn test_tokio_many_partition_read_write() {
1061        let (executor, context) = tokio::Executor::default();
1062        test_many_partition_read_write(executor, context);
1063    }
1064
1065    #[test]
1066    fn test_tokio_blob_read_past_length() {
1067        let (executor, context) = tokio::Executor::default();
1068        test_blob_read_past_length(executor, context);
1069    }
1070
1071    #[test]
1072    fn test_tokio_blob_clone_and_concurrent_read() {
1073        // Run test
1074        let (executor, context) = tokio::Executor::default();
1075        test_blob_clone_and_concurrent_read(executor, context.clone());
1076
1077        // Ensure no blobs still open
1078        let buffer = context.encode();
1079        assert!(buffer.contains("open_blobs 0"));
1080    }
1081
1082    #[test]
1083    fn test_tokio_shutdown() {
1084        let (executor, context) = tokio::Executor::default();
1085        test_shutdown(executor, context);
1086    }
1087
1088    #[test]
1089    fn test_tokio_spawn_ref() {
1090        let (executor, context) = tokio::Executor::default();
1091        test_spawn_ref(executor, context);
1092    }
1093
1094    #[test]
1095    #[should_panic]
1096    fn test_tokio_spawn_ref_duplicate() {
1097        let (executor, context) = tokio::Executor::default();
1098        test_spawn_ref_duplicate(executor, context);
1099    }
1100
1101    #[test]
1102    #[should_panic]
1103    fn test_tokio_spawn_duplicate() {
1104        let (executor, context) = tokio::Executor::default();
1105        test_spawn_duplicate(executor, context);
1106    }
1107
1108    #[test]
1109    fn test_tokio_metrics() {
1110        let (executor, context) = tokio::Executor::default();
1111        test_metrics(executor, context);
1112    }
1113
1114    #[test]
1115    #[should_panic]
1116    fn test_tokio_metrics_label() {
1117        let (executor, context) = tokio::Executor::default();
1118        test_metrics_label(executor, context);
1119    }
1120}