commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::{
22    future::Future,
23    net::SocketAddr,
24    time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31    if #[cfg(not(target_arch = "wasm32"))] {
32        pub mod tokio;
33    }
34}
35pub mod telemetry;
36mod utils;
37pub use utils::{reschedule, Handle, Signal, Signaler};
38
39/// Prefix for runtime metrics.
40const METRICS_PREFIX: &str = "runtime";
41
42/// Errors that can occur when interacting with the runtime.
43#[derive(Error, Debug, PartialEq)]
44pub enum Error {
45    #[error("exited")]
46    Exited,
47    #[error("closed")]
48    Closed,
49    #[error("timeout")]
50    Timeout,
51    #[error("bind failed")]
52    BindFailed,
53    #[error("connection failed")]
54    ConnectionFailed,
55    #[error("write failed")]
56    WriteFailed,
57    #[error("read failed")]
58    ReadFailed,
59    #[error("send failed")]
60    SendFailed,
61    #[error("recv failed")]
62    RecvFailed,
63    #[error("partition creation failed: {0}")]
64    PartitionCreationFailed(String),
65    #[error("partition missing: {0}")]
66    PartitionMissing(String),
67    #[error("partition corrupt: {0}")]
68    PartitionCorrupt(String),
69    #[error("blob open failed: {0}/{1}")]
70    BlobOpenFailed(String, String),
71    #[error("blob missing: {0}/{1}")]
72    BlobMissing(String, String),
73    #[error("blob truncate failed: {0}/{1}")]
74    BlobTruncateFailed(String, String),
75    #[error("blob sync failed: {0}/{1}")]
76    BlobSyncFailed(String, String),
77    #[error("blob close failed: {0}/{1}")]
78    BlobCloseFailed(String, String),
79    #[error("blob insufficient length")]
80    BlobInsufficientLength,
81    #[error("offset overflow")]
82    OffsetOverflow,
83}
84
85/// Interface that any task scheduler must implement to start
86/// running tasks.
87pub trait Runner {
88    /// Start running a root task.
89    ///
90    /// The root task does not create the initial context because it can be useful to have a reference
91    /// to context before starting task execution.
92    fn start<F>(self, f: F) -> F::Output
93    where
94        F: Future + Send + 'static,
95        F::Output: Send + 'static;
96}
97
98/// Interface that any task scheduler must implement to spawn tasks.
99pub trait Spawner: Clone + Send + Sync + 'static {
100    /// Enqueue a task to be executed.
101    ///
102    /// Unlike a future, a spawned task will start executing immediately (even if the caller
103    /// does not await the handle).
104    ///
105    /// Spawned tasks consume the context used to create them. This ensures that context cannot
106    /// be shared between tasks and that a task's context always comes from somewhere.
107    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
108    where
109        F: FnOnce(Self) -> Fut + Send + 'static,
110        Fut: Future<Output = T> + Send + 'static,
111        T: Send + 'static;
112
113    /// Enqueue a task to be executed (without consuming the context).
114    ///
115    /// Unlike a future, a spawned task will start executing immediately (even if the caller
116    /// does not await the handle).
117    ///
118    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
119    /// an actor that already has a reference to context).
120    ///
121    /// # Warning
122    ///
123    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
124    /// to prevent accidental misuse.
125    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
126    where
127        F: Future<Output = T> + Send + 'static,
128        T: Send + 'static;
129
130    /// Enqueue a blocking task to be executed.
131    ///
132    /// This method is designed for synchronous, potentially long-running operations that should
133    /// not block the asynchronous event loop. The task starts executing immediately, and the
134    /// returned handle can be awaited to retrieve the result.
135    ///
136    /// # Warning
137    ///
138    /// Blocking tasks cannot be aborted.
139    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
140    where
141        F: FnOnce() -> T + Send + 'static,
142        T: Send + 'static;
143
144    /// Signals the runtime to stop execution and that all outstanding tasks
145    /// should perform any required cleanup and exit. This method is idempotent and
146    /// can be called multiple times.
147    ///
148    /// This method does not actually kill any tasks but rather signals to them, using
149    /// the `Signal` returned by `stopped`, that they should exit.
150    fn stop(&self, value: i32);
151
152    /// Returns an instance of a `Signal` that resolves when `stop` is called by
153    /// any task.
154    ///
155    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
156    fn stopped(&self) -> Signal;
157}
158
159/// Interface to register and encode metrics.
160pub trait Metrics: Clone + Send + Sync + 'static {
161    /// Get the current label of the context.
162    fn label(&self) -> String;
163
164    /// Create a new instance of `Metrics` with the given label appended to the end
165    /// of the current `Metrics` label.
166    ///
167    /// This is commonly used to create a nested context for `register`.
168    ///
169    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
170    /// label (reserved for metrics for the runtime).
171    fn with_label(&self, label: &str) -> Self;
172
173    /// Prefix the given label with the current context's label.
174    ///
175    /// Unlike `with_label`, this method does not create a new context.
176    fn scoped_label(&self, label: &str) -> String {
177        let label = if self.label().is_empty() {
178            label.to_string()
179        } else {
180            format!("{}_{}", self.label(), label)
181        };
182        assert!(
183            !label.starts_with(METRICS_PREFIX),
184            "using runtime label is not allowed"
185        );
186        label
187    }
188
189    /// Register a metric with the runtime.
190    ///
191    /// Any registered metric will include (as a prefix) the label of the current context.
192    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
193
194    /// Encode all metrics into a buffer.
195    fn encode(&self) -> String;
196}
197
198/// Interface that any task scheduler must implement to provide
199/// time-based operations.
200///
201/// It is necessary to mock time to provide deterministic execution
202/// of arbitrary tasks.
203pub trait Clock: Clone + Send + Sync + 'static {
204    /// Returns the current time.
205    fn current(&self) -> SystemTime;
206
207    /// Sleep for the given duration.
208    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
209
210    /// Sleep until the given deadline.
211    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
212}
213
214/// Interface that any runtime must implement to create
215/// network connections.
216pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
217where
218    L: Listener<Si, St>,
219    Si: Sink,
220    St: Stream,
221{
222    /// Bind to the given socket address.
223    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
224
225    /// Dial the given socket address.
226    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
227}
228
229/// Interface that any runtime must implement to handle
230/// incoming network connections.
231pub trait Listener<Si, St>: Sync + Send + 'static
232where
233    Si: Sink,
234    St: Stream,
235{
236    /// Accept an incoming connection.
237    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
238}
239
240/// Interface that any runtime must implement to send
241/// messages over a network connection.
242pub trait Sink: Sync + Send + 'static {
243    /// Send a message to the sink.
244    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
245}
246
247/// Interface that any runtime must implement to receive
248/// messages over a network connection.
249pub trait Stream: Sync + Send + 'static {
250    /// Receive a message from the stream, storing it in the given buffer.
251    /// Reads exactly the number of bytes that fit in the buffer.
252    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
253}
254
255/// Interface to interact with storage.
256///
257///
258/// To support storage implementations that enable concurrent reads and
259/// writes, blobs are responsible for maintaining synchronization.
260///
261/// Storage can be backed by a local filesystem, cloud storage, etc.
262pub trait Storage<B>: Clone + Send + Sync + 'static
263where
264    B: Blob,
265{
266    /// Open an existing blob in a given partition or create a new one.
267    ///
268    /// Multiple instances of the same blob can be opened concurrently, however,
269    /// writing to the same blob concurrently may lead to undefined behavior.
270    fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
271
272    /// Remove a blob from a given partition.
273    ///
274    /// If no `name` is provided, the entire partition is removed.
275    fn remove(
276        &self,
277        partition: &str,
278        name: Option<&[u8]>,
279    ) -> impl Future<Output = Result<(), Error>> + Send;
280
281    /// Return all blobs in a given partition.
282    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
283}
284
285/// Interface to read and write to a blob.
286///
287/// To support blob implementations that enable concurrent reads and
288/// writes, blobs are responsible for maintaining synchronization.
289///
290/// Cloning a blob is similar to wrapping a single file descriptor in
291/// a lock whereas opening a new blob (of the same name) is similar to
292/// opening a new file descriptor. If multiple blobs are opened with the same
293/// name, they are not expected to coordinate access to underlying storage
294/// and writing to both is undefined behavior.
295#[allow(clippy::len_without_is_empty)]
296pub trait Blob: Clone + Send + Sync + 'static {
297    /// Get the length of the blob.
298    fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
299
300    /// Read from the blob at the given offset.
301    ///
302    /// `read_at` does not return the number of bytes read because it
303    /// only returns once the entire buffer has been filled.
304    fn read_at(
305        &self,
306        buf: &mut [u8],
307        offset: u64,
308    ) -> impl Future<Output = Result<(), Error>> + Send;
309
310    /// Write to the blob at the given offset.
311    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
312
313    /// Truncate the blob to the given length.
314    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
315
316    /// Ensure all pending data is durably persisted.
317    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
318
319    /// Close the blob.
320    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use commonware_macros::select;
327    use futures::channel::oneshot;
328    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
329    use prometheus_client::metrics::counter::Counter;
330    use std::collections::HashMap;
331    use std::panic::{catch_unwind, AssertUnwindSafe};
332    use std::str::FromStr;
333    use std::sync::Mutex;
334    use telemetry::metrics;
335    use tracing::error;
336    use utils::reschedule;
337
338    fn test_error_future(runner: impl Runner) {
339        async fn error_future() -> Result<&'static str, &'static str> {
340            Err("An error occurred")
341        }
342        let result = runner.start(error_future());
343        assert_eq!(result, Err("An error occurred"));
344    }
345
346    fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
347        runner.start(async move {
348            // Capture initial time
349            let start = context.current();
350            let sleep_duration = Duration::from_millis(10);
351            context.sleep(sleep_duration).await;
352
353            // After run, time should have advanced
354            let end = context.current();
355            assert!(end.duration_since(start).unwrap() >= sleep_duration);
356        });
357    }
358
359    fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
360        runner.start(async move {
361            // Trigger sleep
362            let now = context.current();
363            context.sleep_until(now + Duration::from_millis(100)).await;
364
365            // Ensure slept duration has elapsed
366            let elapsed = now.elapsed().unwrap();
367            assert!(elapsed >= Duration::from_millis(100));
368        });
369    }
370
371    fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
372        runner.start(async move {
373            context.spawn(|_| async move {
374                loop {
375                    reschedule().await;
376                }
377            });
378        });
379    }
380
381    fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
382        runner.start(async move {
383            let handle = context.spawn(|_| async move {
384                loop {
385                    reschedule().await;
386                }
387            });
388            handle.abort();
389            assert_eq!(handle.await, Err(Error::Closed));
390        });
391    }
392
393    fn test_panic_aborts_root(runner: impl Runner) {
394        let result = catch_unwind(AssertUnwindSafe(|| {
395            runner.start(async move {
396                panic!("blah");
397            });
398        }));
399        result.unwrap_err();
400    }
401
402    fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
403        let result = runner.start(async move {
404            let result = context.spawn(|_| async move {
405                panic!("blah");
406            });
407            assert_eq!(result.await, Err(Error::Exited));
408            Result::<(), Error>::Ok(())
409        });
410
411        // Ensure panic was caught
412        result.unwrap();
413    }
414
415    fn test_select(runner: impl Runner) {
416        runner.start(async move {
417            // Test first branch
418            let output = Mutex::new(0);
419            select! {
420                v1 = ready(1) => {
421                    *output.lock().unwrap() = v1;
422                },
423                v2 = ready(2) => {
424                    *output.lock().unwrap() = v2;
425                },
426            };
427            assert_eq!(*output.lock().unwrap(), 1);
428
429            // Test second branch
430            select! {
431                v1 = std::future::pending::<i32>() => {
432                    *output.lock().unwrap() = v1;
433                },
434                v2 = ready(2) => {
435                    *output.lock().unwrap() = v2;
436                },
437            };
438            assert_eq!(*output.lock().unwrap(), 2);
439        });
440    }
441
442    /// Ensure future fusing works as expected.
443    fn test_select_loop(runner: impl Runner, context: impl Clock) {
444        runner.start(async move {
445            // Should hit timeout
446            let (mut sender, mut receiver) = mpsc::unbounded();
447            for _ in 0..2 {
448                select! {
449                    v = receiver.next() => {
450                        panic!("unexpected value: {:?}", v);
451                    },
452                    _ = context.sleep(Duration::from_millis(100)) => {
453                        continue;
454                    },
455                };
456            }
457
458            // Populate channel
459            sender.send(0).await.unwrap();
460            sender.send(1).await.unwrap();
461
462            // Prefer not reading channel without losing messages
463            select! {
464                _ = async {} => {
465                    // Skip reading from channel even though populated
466                },
467                v = receiver.next() => {
468                    panic!("unexpected value: {:?}", v);
469                },
470            };
471
472            // Process messages
473            for i in 0..2 {
474                select! {
475                    _ = context.sleep(Duration::from_millis(100)) => {
476                        panic!("timeout");
477                    },
478                    v = receiver.next() => {
479                        assert_eq!(v.unwrap(), i);
480                    },
481                };
482            }
483        });
484    }
485
486    fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
487    where
488        B: Blob,
489    {
490        runner.start(async move {
491            let partition = "test_partition";
492            let name = b"test_blob";
493
494            // Open a new blob
495            let blob = context
496                .open(partition, name)
497                .await
498                .expect("Failed to open blob");
499
500            // Write data to the blob
501            let data = b"Hello, Storage!";
502            blob.write_at(data, 0)
503                .await
504                .expect("Failed to write to blob");
505
506            // Sync the blob
507            blob.sync().await.expect("Failed to sync blob");
508
509            // Read data from the blob
510            let mut buffer = vec![0u8; data.len()];
511            blob.read_at(&mut buffer, 0)
512                .await
513                .expect("Failed to read from blob");
514            assert_eq!(&buffer, data);
515
516            // Get blob length
517            let length = blob.len().await.expect("Failed to get blob length");
518            assert_eq!(length, data.len() as u64);
519
520            // Close the blob
521            blob.close().await.expect("Failed to close blob");
522
523            // Scan blobs in the partition
524            let blobs = context
525                .scan(partition)
526                .await
527                .expect("Failed to scan partition");
528            assert!(blobs.contains(&name.to_vec()));
529
530            // Reopen the blob
531            let blob = context
532                .open(partition, name)
533                .await
534                .expect("Failed to reopen blob");
535
536            // Read data part of message back
537            let mut buffer = vec![0u8; 7];
538            blob.read_at(&mut buffer, 7)
539                .await
540                .expect("Failed to read data");
541            assert_eq!(&buffer, b"Storage");
542
543            // Close the blob
544            blob.close().await.expect("Failed to close blob");
545
546            // Remove the blob
547            context
548                .remove(partition, Some(name))
549                .await
550                .expect("Failed to remove blob");
551
552            // Ensure the blob is removed
553            let blobs = context
554                .scan(partition)
555                .await
556                .expect("Failed to scan partition");
557            assert!(!blobs.contains(&name.to_vec()));
558
559            // Remove the partition
560            context
561                .remove(partition, None)
562                .await
563                .expect("Failed to remove partition");
564
565            // Scan the partition
566            let result = context.scan(partition).await;
567            assert!(matches!(result, Err(Error::PartitionMissing(_))));
568        });
569    }
570
571    fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
572    where
573        B: Blob,
574    {
575        runner.start(async move {
576            let partition = "test_partition";
577            let name = b"test_blob_rw";
578
579            // Open a new blob
580            let blob = context
581                .open(partition, name)
582                .await
583                .expect("Failed to open blob");
584
585            // Write data at different offsets
586            let data1 = b"Hello";
587            let data2 = b"World";
588            blob.write_at(data1, 0)
589                .await
590                .expect("Failed to write data1");
591            blob.write_at(data2, 5)
592                .await
593                .expect("Failed to write data2");
594
595            // Assert that length tracks pending data
596            let length = blob.len().await.expect("Failed to get blob length");
597            assert_eq!(length, 10);
598
599            // Read data back
600            let mut buffer = vec![0u8; 10];
601            blob.read_at(&mut buffer, 0)
602                .await
603                .expect("Failed to read data");
604            assert_eq!(&buffer[..5], data1);
605            assert_eq!(&buffer[5..], data2);
606
607            // Rewrite data without affecting length
608            let data3 = b"Store";
609            blob.write_at(data3, 5)
610                .await
611                .expect("Failed to write data3");
612            let length = blob.len().await.expect("Failed to get blob length");
613            assert_eq!(length, 10);
614
615            // Truncate the blob
616            blob.truncate(5).await.expect("Failed to truncate blob");
617            let length = blob.len().await.expect("Failed to get blob length");
618            assert_eq!(length, 5);
619            let mut buffer = vec![0u8; 5];
620            blob.read_at(&mut buffer, 0)
621                .await
622                .expect("Failed to read data");
623            assert_eq!(&buffer[..5], data1);
624
625            // Full read after truncation
626            let mut buffer = vec![0u8; 10];
627            let result = blob.read_at(&mut buffer, 0).await;
628            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
629
630            // Close the blob
631            blob.close().await.expect("Failed to close blob");
632        });
633    }
634
635    fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
636    where
637        B: Blob,
638    {
639        runner.start(async move {
640            let partitions = ["partition1", "partition2", "partition3"];
641            let name = b"test_blob_rw";
642
643            for (additional, partition) in partitions.iter().enumerate() {
644                // Open a new blob
645                let blob = context
646                    .open(partition, name)
647                    .await
648                    .expect("Failed to open blob");
649
650                // Write data at different offsets
651                let data1 = b"Hello";
652                let data2 = b"World";
653                blob.write_at(data1, 0)
654                    .await
655                    .expect("Failed to write data1");
656                blob.write_at(data2, 5 + additional as u64)
657                    .await
658                    .expect("Failed to write data2");
659
660                // Close the blob
661                blob.close().await.expect("Failed to close blob");
662            }
663
664            for (additional, partition) in partitions.iter().enumerate() {
665                // Open a new blob
666                let blob = context
667                    .open(partition, name)
668                    .await
669                    .expect("Failed to open blob");
670
671                // Read data back
672                let mut buffer = vec![0u8; 10 + additional];
673                blob.read_at(&mut buffer, 0)
674                    .await
675                    .expect("Failed to read data");
676                assert_eq!(&buffer[..5], b"Hello");
677                assert_eq!(&buffer[5 + additional..], b"World");
678
679                // Close the blob
680                blob.close().await.expect("Failed to close blob");
681            }
682        });
683    }
684
685    fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
686    where
687        B: Blob,
688    {
689        runner.start(async move {
690            let partition = "test_partition";
691            let name = b"test_blob_rw";
692
693            // Open a new blob
694            let blob = context
695                .open(partition, name)
696                .await
697                .expect("Failed to open blob");
698
699            // Read data past file length (empty file)
700            let mut buffer = vec![0u8; 10];
701            let result = blob.read_at(&mut buffer, 0).await;
702            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
703
704            // Write data to the blob
705            let data = b"Hello, Storage!";
706            blob.write_at(data, 0)
707                .await
708                .expect("Failed to write to blob");
709
710            // Read data past file length (non-empty file)
711            let mut buffer = vec![0u8; 20];
712            let result = blob.read_at(&mut buffer, 0).await;
713            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
714        })
715    }
716
717    fn test_blob_clone_and_concurrent_read<B>(
718        runner: impl Runner,
719        context: impl Spawner + Storage<B> + Metrics,
720    ) where
721        B: Blob,
722    {
723        runner.start(async move {
724            let partition = "test_partition";
725            let name = b"test_blob_rw";
726
727            // Open a new blob
728            let blob = context
729                .open(partition, name)
730                .await
731                .expect("Failed to open blob");
732
733            // Write data to the blob
734            let data = b"Hello, Storage!";
735            blob.write_at(data, 0)
736                .await
737                .expect("Failed to write to blob");
738
739            // Sync the blob
740            blob.sync().await.expect("Failed to sync blob");
741
742            // Read data from the blob in clone
743            let check1 = context.with_label("check1").spawn({
744                let blob = blob.clone();
745                move |_| async move {
746                    let mut buffer = vec![0u8; data.len()];
747                    blob.read_at(&mut buffer, 0)
748                        .await
749                        .expect("Failed to read from blob");
750                    assert_eq!(&buffer, data);
751                }
752            });
753            let check2 = context.with_label("check2").spawn({
754                let blob = blob.clone();
755                move |_| async move {
756                    let mut buffer = vec![0u8; data.len()];
757                    blob.read_at(&mut buffer, 0)
758                        .await
759                        .expect("Failed to read from blob");
760                    assert_eq!(&buffer, data);
761                }
762            });
763
764            // Wait for both reads to complete
765            let result = join!(check1, check2);
766            assert!(result.0.is_ok());
767            assert!(result.1.is_ok());
768
769            // Read data from the blob
770            let mut buffer = vec![0u8; data.len()];
771            blob.read_at(&mut buffer, 0)
772                .await
773                .expect("Failed to read from blob");
774            assert_eq!(&buffer, data);
775
776            // Get blob length
777            let length = blob.len().await.expect("Failed to get blob length");
778            assert_eq!(length, data.len() as u64);
779
780            // Close the blob
781            blob.close().await.expect("Failed to close blob");
782        });
783    }
784
785    fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
786        let kill = 9;
787        runner.start(async move {
788            // Spawn a task that waits for signal
789            let before = context
790                .with_label("before")
791                .spawn(move |context| async move {
792                    let sig = context.stopped().await;
793                    assert_eq!(sig.unwrap(), kill);
794                });
795
796            // Spawn a task after stop is called
797            let after = context
798                .with_label("after")
799                .spawn(move |context| async move {
800                    // Wait for stop signal
801                    let mut signal = context.stopped();
802                    loop {
803                        select! {
804                            sig = &mut signal => {
805                                // Stopper resolved
806                                assert_eq!(sig.unwrap(), kill);
807                                break;
808                            },
809                            _ = context.sleep(Duration::from_millis(10)) => {
810                                // Continue waiting
811                            },
812                        }
813                    }
814                });
815
816            // Sleep for a bit before stopping
817            context.sleep(Duration::from_millis(50)).await;
818
819            // Signal the task
820            context.stop(kill);
821
822            // Ensure both tasks complete
823            let result = join!(before, after);
824            assert!(result.0.is_ok());
825            assert!(result.1.is_ok());
826        });
827    }
828
829    fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
830        runner.start(async move {
831            let handle = context.spawn_ref();
832            let result = handle(async move { 42 }).await;
833            assert_eq!(result, Ok(42));
834        });
835    }
836
837    fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
838        runner.start(async move {
839            let handle = context.spawn_ref();
840            let result = handle(async move { 42 }).await;
841            assert_eq!(result, Ok(42));
842
843            // Ensure context is consumed
844            let handle = context.spawn_ref();
845            let result = handle(async move { 42 }).await;
846            assert_eq!(result, Ok(42));
847        });
848    }
849
850    fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
851        runner.start(async move {
852            let handle = context.spawn_ref();
853            let result = handle(async move { 42 }).await;
854            assert_eq!(result, Ok(42));
855
856            // Ensure context is consumed
857            context.spawn(|_| async move { 42 });
858        });
859    }
860
861    fn test_spawn_blocking(runner: impl Runner, context: impl Spawner) {
862        runner.start(async move {
863            let handle = context.spawn_blocking(|| 42);
864            let result = handle.await;
865            assert_eq!(result, Ok(42));
866        });
867    }
868
869    fn test_spawn_blocking_abort(runner: impl Runner, context: impl Spawner) {
870        runner.start(async move {
871            // Create task
872            let (sender, mut receiver) = oneshot::channel();
873            let handle = context.spawn_blocking(move || {
874                // Wait for abort to be called
875                loop {
876                    if receiver.try_recv().is_ok() {
877                        break;
878                    }
879                }
880
881                // Perform a long-running operation
882                let mut count = 0;
883                loop {
884                    count += 1;
885                    if count >= 100_000_000 {
886                        break;
887                    }
888                }
889                count
890            });
891
892            // Abort the task
893            //
894            // If there was an `.await` prior to sending a message over the oneshot, this test
895            // could deadlock (depending on the runtime implementation) because the blocking task
896            // would never yield (preventing send from being called).
897            handle.abort();
898            sender.send(()).unwrap();
899
900            // Wait for the task to complete
901            assert_eq!(handle.await, Ok(100_000_000));
902        });
903    }
904
905    fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
906        runner.start(async move {
907            // Assert label
908            assert_eq!(context.label(), "");
909
910            // Register a metric
911            let counter = Counter::<u64>::default();
912            context.register("test", "test", counter.clone());
913
914            // Increment the counter
915            counter.inc();
916
917            // Encode metrics
918            let buffer = context.encode();
919            assert!(buffer.contains("test_total 1"));
920
921            // Nested context
922            let context = context.with_label("nested");
923            let nested_counter = Counter::<u64>::default();
924            context.register("test", "test", nested_counter.clone());
925
926            // Increment the counter
927            nested_counter.inc();
928
929            // Encode metrics
930            let buffer = context.encode();
931            assert!(buffer.contains("nested_test_total 1"));
932            assert!(buffer.contains("test_total 1"));
933        });
934    }
935
936    fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
937        runner.start(async move {
938            context.with_label(METRICS_PREFIX);
939        })
940    }
941
942    fn test_metrics_serve<L, Si, St>(
943        runner: impl Runner,
944        context: impl Clock + Spawner + Metrics + Network<L, Si, St>,
945    ) where
946        L: Listener<Si, St>,
947        Si: Sink,
948        St: Stream,
949    {
950        runner.start(async move {
951            // Register a test metric
952            let counter: Counter<u64> = Counter::default();
953            context.register("test_counter", "Test counter", counter.clone());
954            counter.inc();
955
956            // Define the server address
957            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
958
959            // Start the metrics server (serves one connection and exits)
960            context
961                .with_label("server")
962                .spawn(move |context| async move {
963                    metrics::server::serve(context, address).await;
964                });
965
966            // Helper functions to parse HTTP response
967            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
968                let mut line = Vec::new();
969                loop {
970                    let mut byte = [0; 1];
971                    stream.recv(&mut byte).await?;
972                    if byte[0] == b'\n' {
973                        if line.last() == Some(&b'\r') {
974                            line.pop(); // Remove trailing \r
975                        }
976                        break;
977                    }
978                    line.push(byte[0]);
979                }
980                String::from_utf8(line).map_err(|_| Error::ReadFailed)
981            }
982
983            async fn read_headers<St: Stream>(
984                stream: &mut St,
985            ) -> Result<HashMap<String, String>, Error> {
986                let mut headers = HashMap::new();
987                loop {
988                    let line = read_line(stream).await?;
989                    if line.is_empty() {
990                        break;
991                    }
992                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
993                    if parts.len() == 2 {
994                        headers.insert(parts[0].to_string(), parts[1].to_string());
995                    }
996                }
997                Ok(headers)
998            }
999
1000            async fn read_body<St: Stream>(
1001                stream: &mut St,
1002                content_length: usize,
1003            ) -> Result<String, Error> {
1004                let mut body = vec![0; content_length];
1005                stream.recv(&mut body).await?;
1006                String::from_utf8(body).map_err(|_| Error::ReadFailed)
1007            }
1008
1009            // Simulate a client connecting to the server
1010            let client_handle = context
1011                .with_label("client")
1012                .spawn(move |context| async move {
1013                    let (_, mut stream) = loop {
1014                        match context.dial(address).await {
1015                            Ok((sink, stream)) => break (sink, stream),
1016                            Err(e) => {
1017                                // The client may be polled before the server is ready, that's alright!
1018                                error!(err =?e, "failed to connect");
1019                                context.sleep(Duration::from_millis(10)).await;
1020                            }
1021                        }
1022                    };
1023
1024                    // Read and verify the HTTP status line
1025                    let status_line = read_line(&mut stream).await.unwrap();
1026                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1027
1028                    // Read and parse headers
1029                    let headers = read_headers(&mut stream).await.unwrap();
1030                    let content_length = headers
1031                        .get("Content-Length")
1032                        .unwrap()
1033                        .parse::<usize>()
1034                        .unwrap();
1035
1036                    // Read and verify the body
1037                    let body = read_body(&mut stream, content_length).await.unwrap();
1038                    assert!(body.contains("test_counter_total 1"));
1039                });
1040
1041            // Wait for the client task to complete
1042            client_handle.await.unwrap();
1043        });
1044    }
1045
1046    #[test]
1047    fn test_deterministic_future() {
1048        let (runner, _, _) = deterministic::Executor::default();
1049        test_error_future(runner);
1050    }
1051
1052    #[test]
1053    fn test_deterministic_clock_sleep() {
1054        let (executor, context, _) = deterministic::Executor::default();
1055        assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
1056        test_clock_sleep(executor, context);
1057    }
1058
1059    #[test]
1060    fn test_deterministic_clock_sleep_until() {
1061        let (executor, context, _) = deterministic::Executor::default();
1062        test_clock_sleep_until(executor, context);
1063    }
1064
1065    #[test]
1066    fn test_deterministic_root_finishes() {
1067        let (executor, context, _) = deterministic::Executor::default();
1068        test_root_finishes(executor, context);
1069    }
1070
1071    #[test]
1072    fn test_deterministic_spawn_abort() {
1073        let (executor, context, _) = deterministic::Executor::default();
1074        test_spawn_abort(executor, context);
1075    }
1076
1077    #[test]
1078    fn test_deterministic_panic_aborts_root() {
1079        let (runner, _, _) = deterministic::Executor::default();
1080        test_panic_aborts_root(runner);
1081    }
1082
1083    #[test]
1084    #[should_panic(expected = "blah")]
1085    fn test_deterministic_panic_aborts_spawn() {
1086        let (executor, context, _) = deterministic::Executor::default();
1087        test_panic_aborts_spawn(executor, context);
1088    }
1089
1090    #[test]
1091    fn test_deterministic_select() {
1092        let (executor, _, _) = deterministic::Executor::default();
1093        test_select(executor);
1094    }
1095
1096    #[test]
1097    fn test_deterministic_select_loop() {
1098        let (executor, context, _) = deterministic::Executor::default();
1099        test_select_loop(executor, context);
1100    }
1101
1102    #[test]
1103    fn test_deterministic_storage_operations() {
1104        let (executor, context, _) = deterministic::Executor::default();
1105        test_storage_operations(executor, context);
1106    }
1107
1108    #[test]
1109    fn test_deterministic_blob_read_write() {
1110        let (executor, context, _) = deterministic::Executor::default();
1111        test_blob_read_write(executor, context);
1112    }
1113
1114    #[test]
1115    fn test_deterministic_many_partition_read_write() {
1116        let (executor, context, _) = deterministic::Executor::default();
1117        test_many_partition_read_write(executor, context);
1118    }
1119
1120    #[test]
1121    fn test_deterministic_blob_read_past_length() {
1122        let (executor, context, _) = deterministic::Executor::default();
1123        test_blob_read_past_length(executor, context);
1124    }
1125
1126    #[test]
1127    fn test_deterministic_blob_clone_and_concurrent_read() {
1128        // Run test
1129        let (executor, context, _) = deterministic::Executor::default();
1130        test_blob_clone_and_concurrent_read(executor, context.clone());
1131
1132        // Ensure no blobs still open
1133        let buffer = context.encode();
1134        assert!(buffer.contains("open_blobs 0"));
1135    }
1136
1137    #[test]
1138    fn test_deterministic_shutdown() {
1139        let (executor, context, _) = deterministic::Executor::default();
1140        test_shutdown(executor, context);
1141    }
1142
1143    #[test]
1144    fn test_deterministic_spawn_ref() {
1145        let (executor, context, _) = deterministic::Executor::default();
1146        test_spawn_ref(executor, context);
1147    }
1148
1149    #[test]
1150    #[should_panic]
1151    fn test_deterministic_spawn_ref_duplicate() {
1152        let (executor, context, _) = deterministic::Executor::default();
1153        test_spawn_ref_duplicate(executor, context);
1154    }
1155
1156    #[test]
1157    #[should_panic]
1158    fn test_deterministic_spawn_duplicate() {
1159        let (executor, context, _) = deterministic::Executor::default();
1160        test_spawn_duplicate(executor, context);
1161    }
1162
1163    #[test]
1164    fn test_deterministic_spawn_blocking() {
1165        let (executor, context, _) = deterministic::Executor::default();
1166        test_spawn_blocking(executor, context);
1167    }
1168
1169    #[test]
1170    #[should_panic(expected = "blocking task panicked")]
1171    fn test_deterministic_spawn_blocking_panic() {
1172        let (executor, context, _) = deterministic::Executor::default();
1173        executor.start(async move {
1174            let handle = context.spawn_blocking(|| {
1175                panic!("blocking task panicked");
1176            });
1177            handle.await.unwrap();
1178        });
1179    }
1180
1181    #[test]
1182    fn test_deterministic_spawn_blocking_abort() {
1183        let (executor, context, _) = deterministic::Executor::default();
1184        test_spawn_blocking_abort(executor, context);
1185    }
1186
1187    #[test]
1188    fn test_deterministic_metrics() {
1189        let (executor, context, _) = deterministic::Executor::default();
1190        test_metrics(executor, context);
1191    }
1192
1193    #[test]
1194    #[should_panic]
1195    fn test_deterministic_metrics_label() {
1196        let (executor, context, _) = deterministic::Executor::default();
1197        test_metrics_label(executor, context);
1198    }
1199
1200    #[test]
1201    fn test_deterministic_metrics_serve() {
1202        let (executor, context, _) = deterministic::Executor::default();
1203        test_metrics_serve(executor, context);
1204    }
1205
1206    #[test]
1207    fn test_tokio_error_future() {
1208        let (runner, _) = tokio::Executor::default();
1209        test_error_future(runner);
1210    }
1211
1212    #[test]
1213    fn test_tokio_clock_sleep() {
1214        let (executor, context) = tokio::Executor::default();
1215        test_clock_sleep(executor, context);
1216    }
1217
1218    #[test]
1219    fn test_tokio_clock_sleep_until() {
1220        let (executor, context) = tokio::Executor::default();
1221        test_clock_sleep_until(executor, context);
1222    }
1223
1224    #[test]
1225    fn test_tokio_root_finishes() {
1226        let (executor, context) = tokio::Executor::default();
1227        test_root_finishes(executor, context);
1228    }
1229
1230    #[test]
1231    fn test_tokio_spawn_abort() {
1232        let (executor, context) = tokio::Executor::default();
1233        test_spawn_abort(executor, context);
1234    }
1235
1236    #[test]
1237    fn test_tokio_panic_aborts_root() {
1238        let (runner, _) = tokio::Executor::default();
1239        test_panic_aborts_root(runner);
1240    }
1241
1242    #[test]
1243    fn test_tokio_panic_aborts_spawn() {
1244        let (executor, context) = tokio::Executor::default();
1245        test_panic_aborts_spawn(executor, context);
1246    }
1247
1248    #[test]
1249    fn test_tokio_select() {
1250        let (executor, _) = tokio::Executor::default();
1251        test_select(executor);
1252    }
1253
1254    #[test]
1255    fn test_tokio_select_loop() {
1256        let (executor, context) = tokio::Executor::default();
1257        test_select_loop(executor, context);
1258    }
1259
1260    #[test]
1261    fn test_tokio_storage_operations() {
1262        let (executor, context) = tokio::Executor::default();
1263        test_storage_operations(executor, context);
1264    }
1265
1266    #[test]
1267    fn test_tokio_blob_read_write() {
1268        let (executor, context) = tokio::Executor::default();
1269        test_blob_read_write(executor, context);
1270    }
1271
1272    #[test]
1273    fn test_tokio_many_partition_read_write() {
1274        let (executor, context) = tokio::Executor::default();
1275        test_many_partition_read_write(executor, context);
1276    }
1277
1278    #[test]
1279    fn test_tokio_blob_read_past_length() {
1280        let (executor, context) = tokio::Executor::default();
1281        test_blob_read_past_length(executor, context);
1282    }
1283
1284    #[test]
1285    fn test_tokio_blob_clone_and_concurrent_read() {
1286        // Run test
1287        let (executor, context) = tokio::Executor::default();
1288        test_blob_clone_and_concurrent_read(executor, context.clone());
1289
1290        // Ensure no blobs still open
1291        let buffer = context.encode();
1292        assert!(buffer.contains("open_blobs 0"));
1293    }
1294
1295    #[test]
1296    fn test_tokio_shutdown() {
1297        let (executor, context) = tokio::Executor::default();
1298        test_shutdown(executor, context);
1299    }
1300
1301    #[test]
1302    fn test_tokio_spawn_ref() {
1303        let (executor, context) = tokio::Executor::default();
1304        test_spawn_ref(executor, context);
1305    }
1306
1307    #[test]
1308    #[should_panic]
1309    fn test_tokio_spawn_ref_duplicate() {
1310        let (executor, context) = tokio::Executor::default();
1311        test_spawn_ref_duplicate(executor, context);
1312    }
1313
1314    #[test]
1315    #[should_panic]
1316    fn test_tokio_spawn_duplicate() {
1317        let (executor, context) = tokio::Executor::default();
1318        test_spawn_duplicate(executor, context);
1319    }
1320
1321    #[test]
1322    fn test_tokio_spawn_blocking() {
1323        let (executor, context) = tokio::Executor::default();
1324        test_spawn_blocking(executor, context);
1325    }
1326
1327    #[test]
1328    fn test_tokio_spawn_blocking_panic() {
1329        let (executor, context) = tokio::Executor::default();
1330        executor.start(async move {
1331            let handle = context.spawn_blocking(|| {
1332                panic!("blocking task panicked");
1333            });
1334            let result = handle.await;
1335            assert_eq!(result, Err(Error::Exited));
1336        });
1337    }
1338
1339    #[test]
1340    fn test_tokio_spawn_blocking_abort() {
1341        let (executor, context) = tokio::Executor::default();
1342        test_spawn_blocking_abort(executor, context);
1343    }
1344
1345    #[test]
1346    fn test_tokio_metrics() {
1347        let (executor, context) = tokio::Executor::default();
1348        test_metrics(executor, context);
1349    }
1350
1351    #[test]
1352    #[should_panic]
1353    fn test_tokio_metrics_label() {
1354        let (executor, context) = tokio::Executor::default();
1355        test_metrics_label(executor, context);
1356    }
1357
1358    #[test]
1359    fn test_tokio_metrics_serve() {
1360        let (executor, context) = tokio::Executor::default();
1361        test_metrics_serve(executor, context);
1362    }
1363}