commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23    future::Future,
24    net::SocketAddr,
25    time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32    if #[cfg(not(target_arch = "wasm32"))] {
33        pub mod tokio;
34        pub mod benchmarks;
35    }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41    create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44/// Prefix for runtime metrics.
45const METRICS_PREFIX: &str = "runtime";
46
47/// Errors that can occur when interacting with the runtime.
48#[derive(Error, Debug)]
49pub enum Error {
50    #[error("exited")]
51    Exited,
52    #[error("closed")]
53    Closed,
54    #[error("timeout")]
55    Timeout,
56    #[error("bind failed")]
57    BindFailed,
58    #[error("connection failed")]
59    ConnectionFailed,
60    #[error("write failed")]
61    WriteFailed,
62    #[error("read failed")]
63    ReadFailed,
64    #[error("send failed")]
65    SendFailed,
66    #[error("recv failed")]
67    RecvFailed,
68    #[error("partition creation failed: {0}")]
69    PartitionCreationFailed(String),
70    #[error("partition missing: {0}")]
71    PartitionMissing(String),
72    #[error("partition corrupt: {0}")]
73    PartitionCorrupt(String),
74    #[error("blob open failed: {0}/{1} error: {2}")]
75    BlobOpenFailed(String, String, IoError),
76    #[error("blob missing: {0}/{1}")]
77    BlobMissing(String, String),
78    #[error("blob truncate failed: {0}/{1} error: {2}")]
79    BlobTruncateFailed(String, String, IoError),
80    #[error("blob sync failed: {0}/{1} error: {2}")]
81    BlobSyncFailed(String, String, IoError),
82    #[error("blob close failed: {0}/{1} error: {2}")]
83    BlobCloseFailed(String, String, IoError),
84    #[error("blob insufficient length")]
85    BlobInsufficientLength,
86    #[error("offset overflow")]
87    OffsetOverflow,
88}
89
90/// Interface that any task scheduler must implement to start
91/// running tasks.
92pub trait Runner {
93    /// Context defines the environment available to tasks.
94    /// Example of possible services provided by the context include:
95    /// - [Clock] for time-based operations
96    /// - [Network] for network operations
97    /// - [Storage] for storage operations
98    type Context;
99
100    /// Start running a root task.
101    fn start<F, Fut>(self, f: F) -> Fut::Output
102    where
103        F: FnOnce(Self::Context) -> Fut,
104        Fut: Future;
105}
106
107/// Interface that any task scheduler must implement to spawn tasks.
108pub trait Spawner: Clone + Send + Sync + 'static {
109    /// Enqueue a task to be executed.
110    ///
111    /// Unlike a future, a spawned task will start executing immediately (even if the caller
112    /// does not await the handle).
113    ///
114    /// Spawned tasks consume the context used to create them. This ensures that context cannot
115    /// be shared between tasks and that a task's context always comes from somewhere.
116    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117    where
118        F: FnOnce(Self) -> Fut + Send + 'static,
119        Fut: Future<Output = T> + Send + 'static,
120        T: Send + 'static;
121
122    /// Enqueue a task to be executed (without consuming the context).
123    ///
124    /// Unlike a future, a spawned task will start executing immediately (even if the caller
125    /// does not await the handle).
126    ///
127    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
128    /// an actor that already has a reference to context).
129    ///
130    /// # Warning
131    ///
132    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
133    /// to prevent accidental misuse.
134    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135    where
136        F: Future<Output = T> + Send + 'static,
137        T: Send + 'static;
138
139    /// Enqueue a blocking task to be executed.
140    ///
141    /// This method is designed for synchronous, potentially long-running operations that should
142    /// not block the asynchronous event loop. The task starts executing immediately, and the
143    /// returned handle can be awaited to retrieve the result.
144    ///
145    /// # Warning
146    ///
147    /// Blocking tasks cannot be aborted.
148    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149    where
150        F: FnOnce() -> T + Send + 'static,
151        T: Send + 'static;
152
153    /// Signals the runtime to stop execution and that all outstanding tasks
154    /// should perform any required cleanup and exit. This method is idempotent and
155    /// can be called multiple times.
156    ///
157    /// This method does not actually kill any tasks but rather signals to them, using
158    /// the `Signal` returned by `stopped`, that they should exit.
159    fn stop(&self, value: i32);
160
161    /// Returns an instance of a `Signal` that resolves when `stop` is called by
162    /// any task.
163    ///
164    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
165    fn stopped(&self) -> Signal;
166}
167
168/// Interface to register and encode metrics.
169pub trait Metrics: Clone + Send + Sync + 'static {
170    /// Get the current label of the context.
171    fn label(&self) -> String;
172
173    /// Create a new instance of `Metrics` with the given label appended to the end
174    /// of the current `Metrics` label.
175    ///
176    /// This is commonly used to create a nested context for `register`.
177    ///
178    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
179    /// label (reserved for metrics for the runtime).
180    fn with_label(&self, label: &str) -> Self;
181
182    /// Prefix the given label with the current context's label.
183    ///
184    /// Unlike `with_label`, this method does not create a new context.
185    fn scoped_label(&self, label: &str) -> String {
186        let label = if self.label().is_empty() {
187            label.to_string()
188        } else {
189            format!("{}_{}", self.label(), label)
190        };
191        assert!(
192            !label.starts_with(METRICS_PREFIX),
193            "using runtime label is not allowed"
194        );
195        label
196    }
197
198    /// Register a metric with the runtime.
199    ///
200    /// Any registered metric will include (as a prefix) the label of the current context.
201    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203    /// Encode all metrics into a buffer.
204    fn encode(&self) -> String;
205}
206
207/// Interface that any task scheduler must implement to provide
208/// time-based operations.
209///
210/// It is necessary to mock time to provide deterministic execution
211/// of arbitrary tasks.
212pub trait Clock: Clone + Send + Sync + 'static {
213    /// Returns the current time.
214    fn current(&self) -> SystemTime;
215
216    /// Sleep for the given duration.
217    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219    /// Sleep until the given deadline.
220    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223/// Interface that any runtime must implement to create
224/// network connections.
225pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
226where
227    L: Listener<Si, St>,
228    Si: Sink,
229    St: Stream,
230{
231    /// Bind to the given socket address.
232    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
233
234    /// Dial the given socket address.
235    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
236}
237
238/// Interface that any runtime must implement to handle
239/// incoming network connections.
240pub trait Listener<Si, St>: Sync + Send + 'static
241where
242    Si: Sink,
243    St: Stream,
244{
245    /// Accept an incoming connection.
246    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
247}
248
249/// Interface that any runtime must implement to send
250/// messages over a network connection.
251pub trait Sink: Sync + Send + 'static {
252    /// Send a message to the sink.
253    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256/// Interface that any runtime must implement to receive
257/// messages over a network connection.
258pub trait Stream: Sync + Send + 'static {
259    /// Receive a message from the stream, storing it in the given buffer.
260    /// Reads exactly the number of bytes that fit in the buffer.
261    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
262}
263
264/// Interface to interact with storage.
265///
266///
267/// To support storage implementations that enable concurrent reads and
268/// writes, blobs are responsible for maintaining synchronization.
269///
270/// Storage can be backed by a local filesystem, cloud storage, etc.
271pub trait Storage: Clone + Send + Sync + 'static {
272    /// The readable/writeable storage buffer that can be opened by this Storage.
273    type Blob: Blob;
274
275    /// Open an existing blob in a given partition or create a new one, returning
276    /// the blob and its length.
277    ///
278    /// Multiple instances of the same blob can be opened concurrently, however,
279    /// writing to the same blob concurrently may lead to undefined behavior.
280    fn open(
281        &self,
282        partition: &str,
283        name: &[u8],
284    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
285
286    /// Remove a blob from a given partition.
287    ///
288    /// If no `name` is provided, the entire partition is removed.
289    fn remove(
290        &self,
291        partition: &str,
292        name: Option<&[u8]>,
293    ) -> impl Future<Output = Result<(), Error>> + Send;
294
295    /// Return all blobs in a given partition.
296    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
297}
298
299/// Interface to read and write to a blob.
300///
301/// To support blob implementations that enable concurrent reads and
302/// writes, blobs are responsible for maintaining synchronization.
303///
304/// Cloning a blob is similar to wrapping a single file descriptor in
305/// a lock whereas opening a new blob (of the same name) is similar to
306/// opening a new file descriptor. If multiple blobs are opened with the same
307/// name, they are not expected to coordinate access to underlying storage
308/// and writing to both is undefined behavior.
309#[allow(clippy::len_without_is_empty)]
310pub trait Blob: Clone + Send + Sync + 'static {
311    /// Read from the blob at the given offset.
312    ///
313    /// `read_at` does not return the number of bytes read because it
314    /// only returns once the entire buffer has been filled.
315    fn read_at(
316        &self,
317        buf: &mut [u8],
318        offset: u64,
319    ) -> impl Future<Output = Result<(), Error>> + Send;
320
321    /// Write to the blob at the given offset.
322    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
323
324    /// Truncate the blob to the given length.
325    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
326
327    /// Ensure all pending data is durably persisted.
328    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
329
330    /// Close the blob.
331    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use commonware_macros::select;
338    use futures::channel::oneshot;
339    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
340    use prometheus_client::metrics::counter::Counter;
341    use std::collections::HashMap;
342    use std::panic::{catch_unwind, AssertUnwindSafe};
343    use std::str::FromStr;
344    use std::sync::Mutex;
345    use tracing::{error, Level};
346    use utils::reschedule;
347
348    fn test_error_future<R: Runner>(runner: R) {
349        async fn error_future() -> Result<&'static str, &'static str> {
350            Err("An error occurred")
351        }
352        let result = runner.start(|_| error_future());
353        assert_eq!(result, Err("An error occurred"));
354    }
355
356    fn test_clock_sleep<R: Runner>(runner: R)
357    where
358        R::Context: Spawner + Clock,
359    {
360        runner.start(|context| async move {
361            // Capture initial time
362            let start = context.current();
363            let sleep_duration = Duration::from_millis(10);
364            context.sleep(sleep_duration).await;
365
366            // After run, time should have advanced
367            let end = context.current();
368            assert!(end.duration_since(start).unwrap() >= sleep_duration);
369        });
370    }
371
372    fn test_clock_sleep_until<R: Runner>(runner: R)
373    where
374        R::Context: Spawner + Clock,
375    {
376        runner.start(|context| async move {
377            // Trigger sleep
378            let now = context.current();
379            context.sleep_until(now + Duration::from_millis(100)).await;
380
381            // Ensure slept duration has elapsed
382            let elapsed = now.elapsed().unwrap();
383            assert!(elapsed >= Duration::from_millis(100));
384        });
385    }
386
387    fn test_root_finishes<R: Runner>(runner: R)
388    where
389        R::Context: Spawner,
390    {
391        runner.start(|context| async move {
392            context.spawn(|_| async move {
393                loop {
394                    reschedule().await;
395                }
396            });
397        });
398    }
399
400    fn test_spawn_abort<R: Runner>(runner: R)
401    where
402        R::Context: Spawner,
403    {
404        runner.start(|context| async move {
405            let handle = context.spawn(|_| async move {
406                loop {
407                    reschedule().await;
408                }
409            });
410            handle.abort();
411            assert!(matches!(handle.await, Err(Error::Closed)));
412        });
413    }
414
415    fn test_panic_aborts_root<R: Runner>(runner: R) {
416        let result = catch_unwind(AssertUnwindSafe(|| {
417            runner.start(|_| async move {
418                panic!("blah");
419            });
420        }));
421        result.unwrap_err();
422    }
423
424    fn test_panic_aborts_spawn<R: Runner>(runner: R)
425    where
426        R::Context: Spawner,
427    {
428        let result = runner.start(|context| async move {
429            let result = context.spawn(|_| async move {
430                panic!("blah");
431            });
432            assert!(matches!(result.await, Err(Error::Exited)));
433            Result::<(), Error>::Ok(())
434        });
435
436        // Ensure panic was caught
437        result.unwrap();
438    }
439
440    fn test_select<R: Runner>(runner: R) {
441        runner.start(|_| async move {
442            // Test first branch
443            let output = Mutex::new(0);
444            select! {
445                v1 = ready(1) => {
446                    *output.lock().unwrap() = v1;
447                },
448                v2 = ready(2) => {
449                    *output.lock().unwrap() = v2;
450                },
451            };
452            assert_eq!(*output.lock().unwrap(), 1);
453
454            // Test second branch
455            select! {
456                v1 = std::future::pending::<i32>() => {
457                    *output.lock().unwrap() = v1;
458                },
459                v2 = ready(2) => {
460                    *output.lock().unwrap() = v2;
461                },
462            };
463            assert_eq!(*output.lock().unwrap(), 2);
464        });
465    }
466
467    /// Ensure future fusing works as expected.
468    fn test_select_loop<R: Runner>(runner: R)
469    where
470        R::Context: Clock,
471    {
472        runner.start(|context| async move {
473            // Should hit timeout
474            let (mut sender, mut receiver) = mpsc::unbounded();
475            for _ in 0..2 {
476                select! {
477                    v = receiver.next() => {
478                        panic!("unexpected value: {:?}", v);
479                    },
480                    _ = context.sleep(Duration::from_millis(100)) => {
481                        continue;
482                    },
483                };
484            }
485
486            // Populate channel
487            sender.send(0).await.unwrap();
488            sender.send(1).await.unwrap();
489
490            // Prefer not reading channel without losing messages
491            select! {
492                _ = async {} => {
493                    // Skip reading from channel even though populated
494                },
495                v = receiver.next() => {
496                    panic!("unexpected value: {:?}", v);
497                },
498            };
499
500            // Process messages
501            for i in 0..2 {
502                select! {
503                    _ = context.sleep(Duration::from_millis(100)) => {
504                        panic!("timeout");
505                    },
506                    v = receiver.next() => {
507                        assert_eq!(v.unwrap(), i);
508                    },
509                };
510            }
511        });
512    }
513
514    fn test_storage_operations<R: Runner>(runner: R)
515    where
516        R::Context: Storage,
517    {
518        runner.start(|context| async move {
519            let partition = "test_partition";
520            let name = b"test_blob";
521
522            // Open a new blob
523            let (blob, _) = context
524                .open(partition, name)
525                .await
526                .expect("Failed to open blob");
527
528            // Write data to the blob
529            let data = b"Hello, Storage!";
530            blob.write_at(data, 0)
531                .await
532                .expect("Failed to write to blob");
533
534            // Sync the blob
535            blob.sync().await.expect("Failed to sync blob");
536
537            // Read data from the blob
538            let mut buffer = vec![0u8; data.len()];
539            blob.read_at(&mut buffer, 0)
540                .await
541                .expect("Failed to read from blob");
542            assert_eq!(&buffer, data);
543
544            // Close the blob
545            blob.close().await.expect("Failed to close blob");
546
547            // Scan blobs in the partition
548            let blobs = context
549                .scan(partition)
550                .await
551                .expect("Failed to scan partition");
552            assert!(blobs.contains(&name.to_vec()));
553
554            // Reopen the blob
555            let (blob, len) = context
556                .open(partition, name)
557                .await
558                .expect("Failed to reopen blob");
559            assert_eq!(len, data.len() as u64);
560
561            // Read data part of message back
562            let mut buffer = vec![0u8; 7];
563            blob.read_at(&mut buffer, 7)
564                .await
565                .expect("Failed to read data");
566            assert_eq!(&buffer, b"Storage");
567
568            // Close the blob
569            blob.close().await.expect("Failed to close blob");
570
571            // Remove the blob
572            context
573                .remove(partition, Some(name))
574                .await
575                .expect("Failed to remove blob");
576
577            // Ensure the blob is removed
578            let blobs = context
579                .scan(partition)
580                .await
581                .expect("Failed to scan partition");
582            assert!(!blobs.contains(&name.to_vec()));
583
584            // Remove the partition
585            context
586                .remove(partition, None)
587                .await
588                .expect("Failed to remove partition");
589
590            // Scan the partition
591            let result = context.scan(partition).await;
592            assert!(matches!(result, Err(Error::PartitionMissing(_))));
593        });
594    }
595
596    fn test_blob_read_write<R: Runner>(runner: R)
597    where
598        R::Context: Storage,
599    {
600        runner.start(|context| async move {
601            let partition = "test_partition";
602            let name = b"test_blob_rw";
603
604            // Open a new blob
605            let (blob, _) = context
606                .open(partition, name)
607                .await
608                .expect("Failed to open blob");
609
610            // Write data at different offsets
611            let data1 = b"Hello";
612            let data2 = b"World";
613            blob.write_at(data1, 0)
614                .await
615                .expect("Failed to write data1");
616            blob.write_at(data2, 5)
617                .await
618                .expect("Failed to write data2");
619
620            // Read data back
621            let mut buffer = vec![0u8; 10];
622            blob.read_at(&mut buffer, 0)
623                .await
624                .expect("Failed to read data");
625            assert_eq!(&buffer[..5], data1);
626            assert_eq!(&buffer[5..], data2);
627
628            // Rewrite data without affecting length
629            let data3 = b"Store";
630            blob.write_at(data3, 5)
631                .await
632                .expect("Failed to write data3");
633
634            // Truncate the blob
635            blob.truncate(5).await.expect("Failed to truncate blob");
636            let mut buffer = vec![0u8; 5];
637            blob.read_at(&mut buffer, 0)
638                .await
639                .expect("Failed to read data");
640            assert_eq!(&buffer[..5], data1);
641
642            // Full read after truncation
643            let mut buffer = vec![0u8; 10];
644            let result = blob.read_at(&mut buffer, 0).await;
645            assert!(result.is_err());
646
647            // Close the blob
648            blob.close().await.expect("Failed to close blob");
649        });
650    }
651
652    fn test_many_partition_read_write<R: Runner>(runner: R)
653    where
654        R::Context: Storage,
655    {
656        runner.start(|context| async move {
657            let partitions = ["partition1", "partition2", "partition3"];
658            let name = b"test_blob_rw";
659            let data1 = b"Hello";
660            let data2 = b"World";
661
662            for (additional, partition) in partitions.iter().enumerate() {
663                // Open a new blob
664                let (blob, _) = context
665                    .open(partition, name)
666                    .await
667                    .expect("Failed to open blob");
668
669                // Write data at different offsets
670                blob.write_at(data1, 0)
671                    .await
672                    .expect("Failed to write data1");
673                blob.write_at(data2, 5 + additional as u64)
674                    .await
675                    .expect("Failed to write data2");
676
677                // Close the blob
678                blob.close().await.expect("Failed to close blob");
679            }
680
681            for (additional, partition) in partitions.iter().enumerate() {
682                // Open a new blob
683                let (blob, len) = context
684                    .open(partition, name)
685                    .await
686                    .expect("Failed to open blob");
687                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
688
689                // Read data back
690                let mut buffer = vec![0u8; 10 + additional];
691                blob.read_at(&mut buffer, 0)
692                    .await
693                    .expect("Failed to read data");
694                assert_eq!(&buffer[..5], b"Hello");
695                assert_eq!(&buffer[5 + additional..], b"World");
696
697                // Close the blob
698                blob.close().await.expect("Failed to close blob");
699            }
700        });
701    }
702
703    fn test_blob_read_past_length<R: Runner>(runner: R)
704    where
705        R::Context: Storage,
706    {
707        runner.start(|context| async move {
708            let partition = "test_partition";
709            let name = b"test_blob_rw";
710
711            // Open a new blob
712            let (blob, _) = context
713                .open(partition, name)
714                .await
715                .expect("Failed to open blob");
716
717            // Read data past file length (empty file)
718            let mut buffer = vec![0u8; 10];
719            let result = blob.read_at(&mut buffer, 0).await;
720            assert!(result.is_err());
721
722            // Write data to the blob
723            let data = b"Hello, Storage!";
724            blob.write_at(data, 0)
725                .await
726                .expect("Failed to write to blob");
727
728            // Read data past file length (non-empty file)
729            let mut buffer = vec![0u8; 20];
730            let result = blob.read_at(&mut buffer, 0).await;
731            assert!(result.is_err());
732        })
733    }
734
735    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
736    where
737        R::Context: Spawner + Storage + Metrics,
738    {
739        runner.start(|context| async move {
740            let partition = "test_partition";
741            let name = b"test_blob_rw";
742
743            // Open a new blob
744            let (blob, _) = context
745                .open(partition, name)
746                .await
747                .expect("Failed to open blob");
748
749            // Write data to the blob
750            let data = b"Hello, Storage!";
751            blob.write_at(data, 0)
752                .await
753                .expect("Failed to write to blob");
754
755            // Sync the blob
756            blob.sync().await.expect("Failed to sync blob");
757
758            // Read data from the blob in clone
759            let check1 = context.with_label("check1").spawn({
760                let blob = blob.clone();
761                move |_| async move {
762                    let mut buffer = vec![0u8; data.len()];
763                    blob.read_at(&mut buffer, 0)
764                        .await
765                        .expect("Failed to read from blob");
766                    assert_eq!(&buffer, data);
767                }
768            });
769            let check2 = context.with_label("check2").spawn({
770                let blob = blob.clone();
771                move |_| async move {
772                    let mut buffer = vec![0u8; data.len()];
773                    blob.read_at(&mut buffer, 0)
774                        .await
775                        .expect("Failed to read from blob");
776                    assert_eq!(&buffer, data);
777                }
778            });
779
780            // Wait for both reads to complete
781            let result = join!(check1, check2);
782            assert!(result.0.is_ok());
783            assert!(result.1.is_ok());
784
785            // Read data from the blob
786            let mut buffer = vec![0u8; data.len()];
787            blob.read_at(&mut buffer, 0)
788                .await
789                .expect("Failed to read from blob");
790            assert_eq!(&buffer, data);
791
792            // Close the blob
793            blob.close().await.expect("Failed to close blob");
794
795            // Ensure no blobs still open
796            let buffer = context.encode();
797            assert!(buffer.contains("open_blobs 0"));
798        });
799    }
800
801    fn test_shutdown<R: Runner>(runner: R)
802    where
803        R::Context: Spawner + Metrics + Clock,
804    {
805        let kill = 9;
806        runner.start(|context| async move {
807            // Spawn a task that waits for signal
808            let before = context
809                .with_label("before")
810                .spawn(move |context| async move {
811                    let sig = context.stopped().await;
812                    assert_eq!(sig.unwrap(), kill);
813                });
814
815            // Spawn a task after stop is called
816            let after = context
817                .with_label("after")
818                .spawn(move |context| async move {
819                    // Wait for stop signal
820                    let mut signal = context.stopped();
821                    loop {
822                        select! {
823                            sig = &mut signal => {
824                                // Stopper resolved
825                                assert_eq!(sig.unwrap(), kill);
826                                break;
827                            },
828                            _ = context.sleep(Duration::from_millis(10)) => {
829                                // Continue waiting
830                            },
831                        }
832                    }
833                });
834
835            // Sleep for a bit before stopping
836            context.sleep(Duration::from_millis(50)).await;
837
838            // Signal the task
839            context.stop(kill);
840
841            // Ensure both tasks complete
842            let result = join!(before, after);
843            assert!(result.0.is_ok());
844            assert!(result.1.is_ok());
845        });
846    }
847
848    fn test_spawn_ref<R: Runner>(runner: R)
849    where
850        R::Context: Spawner,
851    {
852        runner.start(|mut context| async move {
853            let handle = context.spawn_ref();
854            let result = handle(async move { 42 }).await;
855            assert!(matches!(result, Ok(42)));
856        });
857    }
858
859    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
860    where
861        R::Context: Spawner,
862    {
863        runner.start(|mut context| async move {
864            let handle = context.spawn_ref();
865            let result = handle(async move { 42 }).await;
866            assert!(matches!(result, Ok(42)));
867
868            // Ensure context is consumed
869            let handle = context.spawn_ref();
870            let result = handle(async move { 42 }).await;
871            assert!(matches!(result, Ok(42)));
872        });
873    }
874
875    fn test_spawn_duplicate<R: Runner>(runner: R)
876    where
877        R::Context: Spawner,
878    {
879        runner.start(|mut context| async move {
880            let handle = context.spawn_ref();
881            let result = handle(async move { 42 }).await;
882            assert!(matches!(result, Ok(42)));
883
884            // Ensure context is consumed
885            context.spawn(|_| async move { 42 });
886        });
887    }
888
889    fn test_spawn_blocking<R: Runner>(runner: R)
890    where
891        R::Context: Spawner,
892    {
893        runner.start(|context| async move {
894            let handle = context.spawn_blocking(|| 42);
895            let result = handle.await;
896            assert!(matches!(result, Ok(42)));
897        });
898    }
899
900    fn test_spawn_blocking_abort<R: Runner>(runner: R)
901    where
902        R::Context: Spawner,
903    {
904        runner.start(|context| async move {
905            // Create task
906            let (sender, mut receiver) = oneshot::channel();
907            let handle = context.spawn_blocking(move || {
908                // Wait for abort to be called
909                loop {
910                    if receiver.try_recv().is_ok() {
911                        break;
912                    }
913                }
914
915                // Perform a long-running operation
916                let mut count = 0;
917                loop {
918                    count += 1;
919                    if count >= 100_000_000 {
920                        break;
921                    }
922                }
923                count
924            });
925
926            // Abort the task
927            //
928            // If there was an `.await` prior to sending a message over the oneshot, this test
929            // could deadlock (depending on the runtime implementation) because the blocking task
930            // would never yield (preventing send from being called).
931            handle.abort();
932            sender.send(()).unwrap();
933
934            // Wait for the task to complete
935            assert!(matches!(handle.await, Ok(100_000_000)));
936        });
937    }
938
939    fn test_metrics<R: Runner>(runner: R)
940    where
941        R::Context: Metrics,
942    {
943        runner.start(|context| async move {
944            // Assert label
945            assert_eq!(context.label(), "");
946
947            // Register a metric
948            let counter = Counter::<u64>::default();
949            context.register("test", "test", counter.clone());
950
951            // Increment the counter
952            counter.inc();
953
954            // Encode metrics
955            let buffer = context.encode();
956            assert!(buffer.contains("test_total 1"));
957
958            // Nested context
959            let context = context.with_label("nested");
960            let nested_counter = Counter::<u64>::default();
961            context.register("test", "test", nested_counter.clone());
962
963            // Increment the counter
964            nested_counter.inc();
965
966            // Encode metrics
967            let buffer = context.encode();
968            assert!(buffer.contains("nested_test_total 1"));
969            assert!(buffer.contains("test_total 1"));
970        });
971    }
972
973    fn test_metrics_label<R: Runner>(runner: R)
974    where
975        R::Context: Metrics,
976    {
977        runner.start(|context| async move {
978            context.with_label(METRICS_PREFIX);
979        })
980    }
981
982    #[test]
983    fn test_deterministic_future() {
984        let runner = deterministic::Runner::default();
985        test_error_future(runner);
986    }
987
988    #[test]
989    fn test_deterministic_clock_sleep() {
990        let executor = deterministic::Runner::default();
991        test_clock_sleep(executor);
992    }
993
994    #[test]
995    fn test_deterministic_clock_sleep_until() {
996        let executor = deterministic::Runner::default();
997        test_clock_sleep_until(executor);
998    }
999
1000    #[test]
1001    fn test_deterministic_root_finishes() {
1002        let executor = deterministic::Runner::default();
1003        test_root_finishes(executor);
1004    }
1005
1006    #[test]
1007    fn test_deterministic_spawn_abort() {
1008        let executor = deterministic::Runner::default();
1009        test_spawn_abort(executor);
1010    }
1011
1012    #[test]
1013    fn test_deterministic_panic_aborts_root() {
1014        let runner = deterministic::Runner::default();
1015        test_panic_aborts_root(runner);
1016    }
1017
1018    #[test]
1019    #[should_panic(expected = "blah")]
1020    fn test_deterministic_panic_aborts_spawn() {
1021        let executor = deterministic::Runner::default();
1022        test_panic_aborts_spawn(executor);
1023    }
1024
1025    #[test]
1026    fn test_deterministic_select() {
1027        let executor = deterministic::Runner::default();
1028        test_select(executor);
1029    }
1030
1031    #[test]
1032    fn test_deterministic_select_loop() {
1033        let executor = deterministic::Runner::default();
1034        test_select_loop(executor);
1035    }
1036
1037    #[test]
1038    fn test_deterministic_storage_operations() {
1039        let executor = deterministic::Runner::default();
1040        test_storage_operations(executor);
1041    }
1042
1043    #[test]
1044    fn test_deterministic_blob_read_write() {
1045        let executor = deterministic::Runner::default();
1046        test_blob_read_write(executor);
1047    }
1048
1049    #[test]
1050    fn test_deterministic_many_partition_read_write() {
1051        let executor = deterministic::Runner::default();
1052        test_many_partition_read_write(executor);
1053    }
1054
1055    #[test]
1056    fn test_deterministic_blob_read_past_length() {
1057        let executor = deterministic::Runner::default();
1058        test_blob_read_past_length(executor);
1059    }
1060
1061    #[test]
1062    fn test_deterministic_blob_clone_and_concurrent_read() {
1063        // Run test
1064        let executor = deterministic::Runner::default();
1065        test_blob_clone_and_concurrent_read(executor);
1066    }
1067
1068    #[test]
1069    fn test_deterministic_shutdown() {
1070        let executor = deterministic::Runner::default();
1071        test_shutdown(executor);
1072    }
1073
1074    #[test]
1075    fn test_deterministic_spawn_ref() {
1076        let executor = deterministic::Runner::default();
1077        test_spawn_ref(executor);
1078    }
1079
1080    #[test]
1081    #[should_panic]
1082    fn test_deterministic_spawn_ref_duplicate() {
1083        let executor = deterministic::Runner::default();
1084        test_spawn_ref_duplicate(executor);
1085    }
1086
1087    #[test]
1088    #[should_panic]
1089    fn test_deterministic_spawn_duplicate() {
1090        let executor = deterministic::Runner::default();
1091        test_spawn_duplicate(executor);
1092    }
1093
1094    #[test]
1095    fn test_deterministic_spawn_blocking() {
1096        let executor = deterministic::Runner::default();
1097        test_spawn_blocking(executor);
1098    }
1099
1100    #[test]
1101    #[should_panic(expected = "blocking task panicked")]
1102    fn test_deterministic_spawn_blocking_panic() {
1103        let executor = deterministic::Runner::default();
1104        executor.start(|context| async move {
1105            let handle = context.spawn_blocking(|| {
1106                panic!("blocking task panicked");
1107            });
1108            handle.await.unwrap();
1109        });
1110    }
1111
1112    #[test]
1113    fn test_deterministic_spawn_blocking_abort() {
1114        let executor = deterministic::Runner::default();
1115        test_spawn_blocking_abort(executor);
1116    }
1117
1118    #[test]
1119    fn test_deterministic_metrics() {
1120        let executor = deterministic::Runner::default();
1121        test_metrics(executor);
1122    }
1123
1124    #[test]
1125    #[should_panic]
1126    fn test_deterministic_metrics_label() {
1127        let executor = deterministic::Runner::default();
1128        test_metrics_label(executor);
1129    }
1130
1131    #[test]
1132    fn test_tokio_error_future() {
1133        let runner = tokio::Runner::default();
1134        test_error_future(runner);
1135    }
1136
1137    #[test]
1138    fn test_tokio_clock_sleep() {
1139        let executor = tokio::Runner::default();
1140        test_clock_sleep(executor);
1141    }
1142
1143    #[test]
1144    fn test_tokio_clock_sleep_until() {
1145        let executor = tokio::Runner::default();
1146        test_clock_sleep_until(executor);
1147    }
1148
1149    #[test]
1150    fn test_tokio_root_finishes() {
1151        let executor = tokio::Runner::default();
1152        test_root_finishes(executor);
1153    }
1154
1155    #[test]
1156    fn test_tokio_spawn_abort() {
1157        let executor = tokio::Runner::default();
1158        test_spawn_abort(executor);
1159    }
1160
1161    #[test]
1162    fn test_tokio_panic_aborts_root() {
1163        let executor = tokio::Runner::default();
1164        test_panic_aborts_root(executor);
1165    }
1166
1167    #[test]
1168    fn test_tokio_panic_aborts_spawn() {
1169        let executor = tokio::Runner::default();
1170        test_panic_aborts_spawn(executor);
1171    }
1172
1173    #[test]
1174    fn test_tokio_select() {
1175        let executor = tokio::Runner::default();
1176        test_select(executor);
1177    }
1178
1179    #[test]
1180    fn test_tokio_select_loop() {
1181        let executor = tokio::Runner::default();
1182        test_select_loop(executor);
1183    }
1184
1185    #[test]
1186    fn test_tokio_storage_operations() {
1187        let executor = tokio::Runner::default();
1188        test_storage_operations(executor);
1189    }
1190
1191    #[test]
1192    fn test_tokio_blob_read_write() {
1193        let executor = tokio::Runner::default();
1194        test_blob_read_write(executor);
1195    }
1196
1197    #[test]
1198    fn test_tokio_many_partition_read_write() {
1199        let executor = tokio::Runner::default();
1200        test_many_partition_read_write(executor);
1201    }
1202
1203    #[test]
1204    fn test_tokio_blob_read_past_length() {
1205        let executor = tokio::Runner::default();
1206        test_blob_read_past_length(executor);
1207    }
1208
1209    #[test]
1210    fn test_tokio_blob_clone_and_concurrent_read() {
1211        // Run test
1212        let executor = tokio::Runner::default();
1213        test_blob_clone_and_concurrent_read(executor);
1214    }
1215
1216    #[test]
1217    fn test_tokio_shutdown() {
1218        let executor = tokio::Runner::default();
1219        test_shutdown(executor);
1220    }
1221
1222    #[test]
1223    fn test_tokio_spawn_ref() {
1224        let executor = tokio::Runner::default();
1225        test_spawn_ref(executor);
1226    }
1227
1228    #[test]
1229    #[should_panic]
1230    fn test_tokio_spawn_ref_duplicate() {
1231        let executor = tokio::Runner::default();
1232        test_spawn_ref_duplicate(executor);
1233    }
1234
1235    #[test]
1236    #[should_panic]
1237    fn test_tokio_spawn_duplicate() {
1238        let executor = tokio::Runner::default();
1239        test_spawn_duplicate(executor);
1240    }
1241
1242    #[test]
1243    fn test_tokio_spawn_blocking() {
1244        let executor = tokio::Runner::default();
1245        test_spawn_blocking(executor);
1246    }
1247
1248    #[test]
1249    fn test_tokio_spawn_blocking_panic() {
1250        let executor = tokio::Runner::default();
1251        executor.start(|context| async move {
1252            let handle = context.spawn_blocking(|| {
1253                panic!("blocking task panicked");
1254            });
1255            let result = handle.await;
1256            assert!(matches!(result, Err(Error::Exited)));
1257        });
1258    }
1259
1260    #[test]
1261    fn test_tokio_spawn_blocking_abort() {
1262        let executor = tokio::Runner::default();
1263        test_spawn_blocking_abort(executor);
1264    }
1265
1266    #[test]
1267    fn test_tokio_metrics() {
1268        let executor = tokio::Runner::default();
1269        test_metrics(executor);
1270    }
1271
1272    #[test]
1273    #[should_panic]
1274    fn test_tokio_metrics_label() {
1275        let executor = tokio::Runner::default();
1276        test_metrics_label(executor);
1277    }
1278
1279    #[test]
1280    fn test_tokio_telemetry() {
1281        let executor = tokio::Runner::default();
1282        executor.start(|context| async move {
1283            // Define the server address
1284            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1285
1286            // Configure telemetry
1287            tokio::telemetry::init(
1288                context.with_label("metrics"),
1289                Level::INFO,
1290                Some(address),
1291                None,
1292            );
1293
1294            // Register a test metric
1295            let counter: Counter<u64> = Counter::default();
1296            context.register("test_counter", "Test counter", counter.clone());
1297            counter.inc();
1298
1299            // Helper functions to parse HTTP response
1300            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1301                let mut line = Vec::new();
1302                loop {
1303                    let mut byte = [0; 1];
1304                    stream.recv(&mut byte).await?;
1305                    if byte[0] == b'\n' {
1306                        if line.last() == Some(&b'\r') {
1307                            line.pop(); // Remove trailing \r
1308                        }
1309                        break;
1310                    }
1311                    line.push(byte[0]);
1312                }
1313                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1314            }
1315
1316            async fn read_headers<St: Stream>(
1317                stream: &mut St,
1318            ) -> Result<HashMap<String, String>, Error> {
1319                let mut headers = HashMap::new();
1320                loop {
1321                    let line = read_line(stream).await?;
1322                    if line.is_empty() {
1323                        break;
1324                    }
1325                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1326                    if parts.len() == 2 {
1327                        headers.insert(parts[0].to_string(), parts[1].to_string());
1328                    }
1329                }
1330                Ok(headers)
1331            }
1332
1333            async fn read_body<St: Stream>(
1334                stream: &mut St,
1335                content_length: usize,
1336            ) -> Result<String, Error> {
1337                let mut body = vec![0; content_length];
1338                stream.recv(&mut body).await?;
1339                String::from_utf8(body).map_err(|_| Error::ReadFailed)
1340            }
1341
1342            // Simulate a client connecting to the server
1343            let client_handle = context
1344                .with_label("client")
1345                .spawn(move |context| async move {
1346                    let (mut sink, mut stream) = loop {
1347                        match context.dial(address).await {
1348                            Ok((sink, stream)) => break (sink, stream),
1349                            Err(e) => {
1350                                // The client may be polled before the server is ready, that's alright!
1351                                error!(err =?e, "failed to connect");
1352                                context.sleep(Duration::from_millis(10)).await;
1353                            }
1354                        }
1355                    };
1356
1357                    // Send a GET request to the server
1358                    let request = format!(
1359                        "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1360                        address
1361                    );
1362                    sink.send(request.as_bytes()).await.unwrap();
1363
1364                    // Read and verify the HTTP status line
1365                    let status_line = read_line(&mut stream).await.unwrap();
1366                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1367
1368                    // Read and parse headers
1369                    let headers = read_headers(&mut stream).await.unwrap();
1370                    println!("Headers: {:?}", headers);
1371                    let content_length = headers
1372                        .get("content-length")
1373                        .unwrap()
1374                        .parse::<usize>()
1375                        .unwrap();
1376
1377                    // Read and verify the body
1378                    let body = read_body(&mut stream, content_length).await.unwrap();
1379                    assert!(body.contains("test_counter_total 1"));
1380                });
1381
1382            // Wait for the client task to complete
1383            client_handle.await.unwrap();
1384        });
1385    }
1386}