commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23    future::Future,
24    net::SocketAddr,
25    time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32    if #[cfg(not(target_arch = "wasm32"))] {
33        pub mod tokio;
34        pub mod benchmarks;
35    }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41    create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44/// Prefix for runtime metrics.
45const METRICS_PREFIX: &str = "runtime";
46
47/// Errors that can occur when interacting with the runtime.
48#[derive(Error, Debug)]
49pub enum Error {
50    #[error("exited")]
51    Exited,
52    #[error("closed")]
53    Closed,
54    #[error("timeout")]
55    Timeout,
56    #[error("bind failed")]
57    BindFailed,
58    #[error("connection failed")]
59    ConnectionFailed,
60    #[error("write failed")]
61    WriteFailed,
62    #[error("read failed")]
63    ReadFailed,
64    #[error("send failed")]
65    SendFailed,
66    #[error("recv failed")]
67    RecvFailed,
68    #[error("partition creation failed: {0}")]
69    PartitionCreationFailed(String),
70    #[error("partition missing: {0}")]
71    PartitionMissing(String),
72    #[error("partition corrupt: {0}")]
73    PartitionCorrupt(String),
74    #[error("blob open failed: {0}/{1} error: {2}")]
75    BlobOpenFailed(String, String, IoError),
76    #[error("blob missing: {0}/{1}")]
77    BlobMissing(String, String),
78    #[error("blob truncate failed: {0}/{1} error: {2}")]
79    BlobTruncateFailed(String, String, IoError),
80    #[error("blob sync failed: {0}/{1} error: {2}")]
81    BlobSyncFailed(String, String, IoError),
82    #[error("blob close failed: {0}/{1} error: {2}")]
83    BlobCloseFailed(String, String, IoError),
84    #[error("blob insufficient length")]
85    BlobInsufficientLength,
86    #[error("offset overflow")]
87    OffsetOverflow,
88}
89
90/// Interface that any task scheduler must implement to start
91/// running tasks.
92pub trait Runner {
93    /// Context defines the environment available to tasks.
94    /// Example of possible services provided by the context include:
95    /// - [Clock] for time-based operations
96    /// - [Network] for network operations
97    /// - [Storage] for storage operations
98    type Context;
99
100    /// Start running a root task.
101    fn start<F, Fut>(self, f: F) -> Fut::Output
102    where
103        F: FnOnce(Self::Context) -> Fut,
104        Fut: Future;
105}
106
107/// Interface that any task scheduler must implement to spawn tasks.
108pub trait Spawner: Clone + Send + Sync + 'static {
109    /// Enqueue a task to be executed.
110    ///
111    /// Unlike a future, a spawned task will start executing immediately (even if the caller
112    /// does not await the handle).
113    ///
114    /// Spawned tasks consume the context used to create them. This ensures that context cannot
115    /// be shared between tasks and that a task's context always comes from somewhere.
116    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117    where
118        F: FnOnce(Self) -> Fut + Send + 'static,
119        Fut: Future<Output = T> + Send + 'static,
120        T: Send + 'static;
121
122    /// Enqueue a task to be executed (without consuming the context).
123    ///
124    /// Unlike a future, a spawned task will start executing immediately (even if the caller
125    /// does not await the handle).
126    ///
127    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
128    /// an actor that already has a reference to context).
129    ///
130    /// # Warning
131    ///
132    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
133    /// to prevent accidental misuse.
134    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135    where
136        F: Future<Output = T> + Send + 'static,
137        T: Send + 'static;
138
139    /// Enqueue a blocking task to be executed.
140    ///
141    /// This method is designed for synchronous, potentially long-running operations that should
142    /// not block the asynchronous event loop. The task starts executing immediately, and the
143    /// returned handle can be awaited to retrieve the result.
144    ///
145    /// # Warning
146    ///
147    /// Blocking tasks cannot be aborted.
148    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149    where
150        F: FnOnce() -> T + Send + 'static,
151        T: Send + 'static;
152
153    /// Signals the runtime to stop execution and that all outstanding tasks
154    /// should perform any required cleanup and exit. This method is idempotent and
155    /// can be called multiple times.
156    ///
157    /// This method does not actually kill any tasks but rather signals to them, using
158    /// the `Signal` returned by `stopped`, that they should exit.
159    fn stop(&self, value: i32);
160
161    /// Returns an instance of a `Signal` that resolves when `stop` is called by
162    /// any task.
163    ///
164    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
165    fn stopped(&self) -> Signal;
166}
167
168/// Interface to register and encode metrics.
169pub trait Metrics: Clone + Send + Sync + 'static {
170    /// Get the current label of the context.
171    fn label(&self) -> String;
172
173    /// Create a new instance of `Metrics` with the given label appended to the end
174    /// of the current `Metrics` label.
175    ///
176    /// This is commonly used to create a nested context for `register`.
177    ///
178    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
179    /// label (reserved for metrics for the runtime).
180    fn with_label(&self, label: &str) -> Self;
181
182    /// Prefix the given label with the current context's label.
183    ///
184    /// Unlike `with_label`, this method does not create a new context.
185    fn scoped_label(&self, label: &str) -> String {
186        let label = if self.label().is_empty() {
187            label.to_string()
188        } else {
189            format!("{}_{}", self.label(), label)
190        };
191        assert!(
192            !label.starts_with(METRICS_PREFIX),
193            "using runtime label is not allowed"
194        );
195        label
196    }
197
198    /// Register a metric with the runtime.
199    ///
200    /// Any registered metric will include (as a prefix) the label of the current context.
201    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203    /// Encode all metrics into a buffer.
204    fn encode(&self) -> String;
205}
206
207/// Interface that any task scheduler must implement to provide
208/// time-based operations.
209///
210/// It is necessary to mock time to provide deterministic execution
211/// of arbitrary tasks.
212pub trait Clock: Clone + Send + Sync + 'static {
213    /// Returns the current time.
214    fn current(&self) -> SystemTime;
215
216    /// Sleep for the given duration.
217    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219    /// Sleep until the given deadline.
220    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223/// Interface that any runtime must implement to create
224/// network connections.
225pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
226where
227    L: Listener<Si, St>,
228    Si: Sink,
229    St: Stream,
230{
231    /// Bind to the given socket address.
232    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
233
234    /// Dial the given socket address.
235    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
236}
237
238/// Interface that any runtime must implement to handle
239/// incoming network connections.
240pub trait Listener<Si, St>: Sync + Send + 'static
241where
242    Si: Sink,
243    St: Stream,
244{
245    /// Accept an incoming connection.
246    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
247}
248
249/// Interface that any runtime must implement to send
250/// messages over a network connection.
251pub trait Sink: Sync + Send + 'static {
252    /// Send a message to the sink.
253    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256/// Interface that any runtime must implement to receive
257/// messages over a network connection.
258pub trait Stream: Sync + Send + 'static {
259    /// Receive a message from the stream, storing it in the given buffer.
260    /// Reads exactly the number of bytes that fit in the buffer.
261    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
262}
263
264/// Interface to interact with storage.
265///
266///
267/// To support storage implementations that enable concurrent reads and
268/// writes, blobs are responsible for maintaining synchronization.
269///
270/// Storage can be backed by a local filesystem, cloud storage, etc.
271pub trait Storage: Clone + Send + Sync + 'static {
272    /// The readable/writeable storage buffer that can be opened by this Storage.
273    type Blob: Blob;
274
275    /// Open an existing blob in a given partition or create a new one, returning
276    /// the blob and its length.
277    ///
278    /// Multiple instances of the same blob can be opened concurrently, however,
279    /// writing to the same blob concurrently may lead to undefined behavior.
280    fn open(
281        &self,
282        partition: &str,
283        name: &[u8],
284    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
285
286    /// Remove a blob from a given partition.
287    ///
288    /// If no `name` is provided, the entire partition is removed.
289    fn remove(
290        &self,
291        partition: &str,
292        name: Option<&[u8]>,
293    ) -> impl Future<Output = Result<(), Error>> + Send;
294
295    /// Return all blobs in a given partition.
296    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
297}
298
299/// Interface to read and write to a blob.
300///
301/// To support blob implementations that enable concurrent reads and
302/// writes, blobs are responsible for maintaining synchronization.
303///
304/// Cloning a blob is similar to wrapping a single file descriptor in
305/// a lock whereas opening a new blob (of the same name) is similar to
306/// opening a new file descriptor. If multiple blobs are opened with the same
307/// name, they are not expected to coordinate access to underlying storage
308/// and writing to both is undefined behavior.
309#[allow(clippy::len_without_is_empty)]
310pub trait Blob: Clone + Send + Sync + 'static {
311    /// Read from the blob at the given offset.
312    ///
313    /// `read_at` does not return the number of bytes read because it
314    /// only returns once the entire buffer has been filled.
315    fn read_at(
316        &self,
317        buf: &mut [u8],
318        offset: u64,
319    ) -> impl Future<Output = Result<(), Error>> + Send;
320
321    /// Write to the blob at the given offset.
322    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
323
324    /// Truncate the blob to the given length.
325    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
326
327    /// Ensure all pending data is durably persisted.
328    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
329
330    /// Close the blob.
331    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use commonware_macros::select;
338    use futures::channel::oneshot;
339    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
340    use prometheus_client::metrics::counter::Counter;
341    use std::collections::HashMap;
342    use std::panic::{catch_unwind, AssertUnwindSafe};
343    use std::str::FromStr;
344    use std::sync::Mutex;
345    use telemetry::metrics;
346    use tracing::error;
347    use utils::reschedule;
348
349    fn test_error_future<R: Runner>(runner: R) {
350        async fn error_future() -> Result<&'static str, &'static str> {
351            Err("An error occurred")
352        }
353        let result = runner.start(|_| error_future());
354        assert_eq!(result, Err("An error occurred"));
355    }
356
357    fn test_clock_sleep<R: Runner>(runner: R)
358    where
359        R::Context: Spawner + Clock,
360    {
361        runner.start(|context| async move {
362            // Capture initial time
363            let start = context.current();
364            let sleep_duration = Duration::from_millis(10);
365            context.sleep(sleep_duration).await;
366
367            // After run, time should have advanced
368            let end = context.current();
369            assert!(end.duration_since(start).unwrap() >= sleep_duration);
370        });
371    }
372
373    fn test_clock_sleep_until<R: Runner>(runner: R)
374    where
375        R::Context: Spawner + Clock,
376    {
377        runner.start(|context| async move {
378            // Trigger sleep
379            let now = context.current();
380            context.sleep_until(now + Duration::from_millis(100)).await;
381
382            // Ensure slept duration has elapsed
383            let elapsed = now.elapsed().unwrap();
384            assert!(elapsed >= Duration::from_millis(100));
385        });
386    }
387
388    fn test_root_finishes<R: Runner>(runner: R)
389    where
390        R::Context: Spawner,
391    {
392        runner.start(|context| async move {
393            context.spawn(|_| async move {
394                loop {
395                    reschedule().await;
396                }
397            });
398        });
399    }
400
401    fn test_spawn_abort<R: Runner>(runner: R)
402    where
403        R::Context: Spawner,
404    {
405        runner.start(|context| async move {
406            let handle = context.spawn(|_| async move {
407                loop {
408                    reschedule().await;
409                }
410            });
411            handle.abort();
412            assert!(matches!(handle.await, Err(Error::Closed)));
413        });
414    }
415
416    fn test_panic_aborts_root<R: Runner>(runner: R) {
417        let result = catch_unwind(AssertUnwindSafe(|| {
418            runner.start(|_| async move {
419                panic!("blah");
420            });
421        }));
422        result.unwrap_err();
423    }
424
425    fn test_panic_aborts_spawn<R: Runner>(runner: R)
426    where
427        R::Context: Spawner,
428    {
429        let result = runner.start(|context| async move {
430            let result = context.spawn(|_| async move {
431                panic!("blah");
432            });
433            assert!(matches!(result.await, Err(Error::Exited)));
434            Result::<(), Error>::Ok(())
435        });
436
437        // Ensure panic was caught
438        result.unwrap();
439    }
440
441    fn test_select<R: Runner>(runner: R) {
442        runner.start(|_| async move {
443            // Test first branch
444            let output = Mutex::new(0);
445            select! {
446                v1 = ready(1) => {
447                    *output.lock().unwrap() = v1;
448                },
449                v2 = ready(2) => {
450                    *output.lock().unwrap() = v2;
451                },
452            };
453            assert_eq!(*output.lock().unwrap(), 1);
454
455            // Test second branch
456            select! {
457                v1 = std::future::pending::<i32>() => {
458                    *output.lock().unwrap() = v1;
459                },
460                v2 = ready(2) => {
461                    *output.lock().unwrap() = v2;
462                },
463            };
464            assert_eq!(*output.lock().unwrap(), 2);
465        });
466    }
467
468    /// Ensure future fusing works as expected.
469    fn test_select_loop<R: Runner>(runner: R)
470    where
471        R::Context: Clock,
472    {
473        runner.start(|context| async move {
474            // Should hit timeout
475            let (mut sender, mut receiver) = mpsc::unbounded();
476            for _ in 0..2 {
477                select! {
478                    v = receiver.next() => {
479                        panic!("unexpected value: {:?}", v);
480                    },
481                    _ = context.sleep(Duration::from_millis(100)) => {
482                        continue;
483                    },
484                };
485            }
486
487            // Populate channel
488            sender.send(0).await.unwrap();
489            sender.send(1).await.unwrap();
490
491            // Prefer not reading channel without losing messages
492            select! {
493                _ = async {} => {
494                    // Skip reading from channel even though populated
495                },
496                v = receiver.next() => {
497                    panic!("unexpected value: {:?}", v);
498                },
499            };
500
501            // Process messages
502            for i in 0..2 {
503                select! {
504                    _ = context.sleep(Duration::from_millis(100)) => {
505                        panic!("timeout");
506                    },
507                    v = receiver.next() => {
508                        assert_eq!(v.unwrap(), i);
509                    },
510                };
511            }
512        });
513    }
514
515    fn test_storage_operations<R: Runner>(runner: R)
516    where
517        R::Context: Storage,
518    {
519        runner.start(|context| async move {
520            let partition = "test_partition";
521            let name = b"test_blob";
522
523            // Open a new blob
524            let (blob, _) = context
525                .open(partition, name)
526                .await
527                .expect("Failed to open blob");
528
529            // Write data to the blob
530            let data = b"Hello, Storage!";
531            blob.write_at(data, 0)
532                .await
533                .expect("Failed to write to blob");
534
535            // Sync the blob
536            blob.sync().await.expect("Failed to sync blob");
537
538            // Read data from the blob
539            let mut buffer = vec![0u8; data.len()];
540            blob.read_at(&mut buffer, 0)
541                .await
542                .expect("Failed to read from blob");
543            assert_eq!(&buffer, data);
544
545            // Close the blob
546            blob.close().await.expect("Failed to close blob");
547
548            // Scan blobs in the partition
549            let blobs = context
550                .scan(partition)
551                .await
552                .expect("Failed to scan partition");
553            assert!(blobs.contains(&name.to_vec()));
554
555            // Reopen the blob
556            let (blob, len) = context
557                .open(partition, name)
558                .await
559                .expect("Failed to reopen blob");
560            assert_eq!(len, data.len() as u64);
561
562            // Read data part of message back
563            let mut buffer = vec![0u8; 7];
564            blob.read_at(&mut buffer, 7)
565                .await
566                .expect("Failed to read data");
567            assert_eq!(&buffer, b"Storage");
568
569            // Close the blob
570            blob.close().await.expect("Failed to close blob");
571
572            // Remove the blob
573            context
574                .remove(partition, Some(name))
575                .await
576                .expect("Failed to remove blob");
577
578            // Ensure the blob is removed
579            let blobs = context
580                .scan(partition)
581                .await
582                .expect("Failed to scan partition");
583            assert!(!blobs.contains(&name.to_vec()));
584
585            // Remove the partition
586            context
587                .remove(partition, None)
588                .await
589                .expect("Failed to remove partition");
590
591            // Scan the partition
592            let result = context.scan(partition).await;
593            assert!(matches!(result, Err(Error::PartitionMissing(_))));
594        });
595    }
596
597    fn test_blob_read_write<R: Runner>(runner: R)
598    where
599        R::Context: Storage,
600    {
601        runner.start(|context| async move {
602            let partition = "test_partition";
603            let name = b"test_blob_rw";
604
605            // Open a new blob
606            let (blob, _) = context
607                .open(partition, name)
608                .await
609                .expect("Failed to open blob");
610
611            // Write data at different offsets
612            let data1 = b"Hello";
613            let data2 = b"World";
614            blob.write_at(data1, 0)
615                .await
616                .expect("Failed to write data1");
617            blob.write_at(data2, 5)
618                .await
619                .expect("Failed to write data2");
620
621            // Read data back
622            let mut buffer = vec![0u8; 10];
623            blob.read_at(&mut buffer, 0)
624                .await
625                .expect("Failed to read data");
626            assert_eq!(&buffer[..5], data1);
627            assert_eq!(&buffer[5..], data2);
628
629            // Rewrite data without affecting length
630            let data3 = b"Store";
631            blob.write_at(data3, 5)
632                .await
633                .expect("Failed to write data3");
634
635            // Truncate the blob
636            blob.truncate(5).await.expect("Failed to truncate blob");
637            let mut buffer = vec![0u8; 5];
638            blob.read_at(&mut buffer, 0)
639                .await
640                .expect("Failed to read data");
641            assert_eq!(&buffer[..5], data1);
642
643            // Full read after truncation
644            let mut buffer = vec![0u8; 10];
645            let result = blob.read_at(&mut buffer, 0).await;
646            assert!(result.is_err());
647
648            // Close the blob
649            blob.close().await.expect("Failed to close blob");
650        });
651    }
652
653    fn test_many_partition_read_write<R: Runner>(runner: R)
654    where
655        R::Context: Storage,
656    {
657        runner.start(|context| async move {
658            let partitions = ["partition1", "partition2", "partition3"];
659            let name = b"test_blob_rw";
660            let data1 = b"Hello";
661            let data2 = b"World";
662
663            for (additional, partition) in partitions.iter().enumerate() {
664                // Open a new blob
665                let (blob, _) = context
666                    .open(partition, name)
667                    .await
668                    .expect("Failed to open blob");
669
670                // Write data at different offsets
671                blob.write_at(data1, 0)
672                    .await
673                    .expect("Failed to write data1");
674                blob.write_at(data2, 5 + additional as u64)
675                    .await
676                    .expect("Failed to write data2");
677
678                // Close the blob
679                blob.close().await.expect("Failed to close blob");
680            }
681
682            for (additional, partition) in partitions.iter().enumerate() {
683                // Open a new blob
684                let (blob, len) = context
685                    .open(partition, name)
686                    .await
687                    .expect("Failed to open blob");
688                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
689
690                // Read data back
691                let mut buffer = vec![0u8; 10 + additional];
692                blob.read_at(&mut buffer, 0)
693                    .await
694                    .expect("Failed to read data");
695                assert_eq!(&buffer[..5], b"Hello");
696                assert_eq!(&buffer[5 + additional..], b"World");
697
698                // Close the blob
699                blob.close().await.expect("Failed to close blob");
700            }
701        });
702    }
703
704    fn test_blob_read_past_length<R: Runner>(runner: R)
705    where
706        R::Context: Storage,
707    {
708        runner.start(|context| async move {
709            let partition = "test_partition";
710            let name = b"test_blob_rw";
711
712            // Open a new blob
713            let (blob, _) = context
714                .open(partition, name)
715                .await
716                .expect("Failed to open blob");
717
718            // Read data past file length (empty file)
719            let mut buffer = vec![0u8; 10];
720            let result = blob.read_at(&mut buffer, 0).await;
721            assert!(result.is_err());
722
723            // Write data to the blob
724            let data = b"Hello, Storage!";
725            blob.write_at(data, 0)
726                .await
727                .expect("Failed to write to blob");
728
729            // Read data past file length (non-empty file)
730            let mut buffer = vec![0u8; 20];
731            let result = blob.read_at(&mut buffer, 0).await;
732            assert!(result.is_err());
733        })
734    }
735
736    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
737    where
738        R::Context: Spawner + Storage + Metrics,
739    {
740        runner.start(|context| async move {
741            let partition = "test_partition";
742            let name = b"test_blob_rw";
743
744            // Open a new blob
745            let (blob, _) = context
746                .open(partition, name)
747                .await
748                .expect("Failed to open blob");
749
750            // Write data to the blob
751            let data = b"Hello, Storage!";
752            blob.write_at(data, 0)
753                .await
754                .expect("Failed to write to blob");
755
756            // Sync the blob
757            blob.sync().await.expect("Failed to sync blob");
758
759            // Read data from the blob in clone
760            let check1 = context.with_label("check1").spawn({
761                let blob = blob.clone();
762                move |_| async move {
763                    let mut buffer = vec![0u8; data.len()];
764                    blob.read_at(&mut buffer, 0)
765                        .await
766                        .expect("Failed to read from blob");
767                    assert_eq!(&buffer, data);
768                }
769            });
770            let check2 = context.with_label("check2").spawn({
771                let blob = blob.clone();
772                move |_| async move {
773                    let mut buffer = vec![0u8; data.len()];
774                    blob.read_at(&mut buffer, 0)
775                        .await
776                        .expect("Failed to read from blob");
777                    assert_eq!(&buffer, data);
778                }
779            });
780
781            // Wait for both reads to complete
782            let result = join!(check1, check2);
783            assert!(result.0.is_ok());
784            assert!(result.1.is_ok());
785
786            // Read data from the blob
787            let mut buffer = vec![0u8; data.len()];
788            blob.read_at(&mut buffer, 0)
789                .await
790                .expect("Failed to read from blob");
791            assert_eq!(&buffer, data);
792
793            // Close the blob
794            blob.close().await.expect("Failed to close blob");
795
796            // Ensure no blobs still open
797            let buffer = context.encode();
798            assert!(buffer.contains("open_blobs 0"));
799        });
800    }
801
802    fn test_shutdown<R: Runner>(runner: R)
803    where
804        R::Context: Spawner + Metrics + Clock,
805    {
806        let kill = 9;
807        runner.start(|context| async move {
808            // Spawn a task that waits for signal
809            let before = context
810                .with_label("before")
811                .spawn(move |context| async move {
812                    let sig = context.stopped().await;
813                    assert_eq!(sig.unwrap(), kill);
814                });
815
816            // Spawn a task after stop is called
817            let after = context
818                .with_label("after")
819                .spawn(move |context| async move {
820                    // Wait for stop signal
821                    let mut signal = context.stopped();
822                    loop {
823                        select! {
824                            sig = &mut signal => {
825                                // Stopper resolved
826                                assert_eq!(sig.unwrap(), kill);
827                                break;
828                            },
829                            _ = context.sleep(Duration::from_millis(10)) => {
830                                // Continue waiting
831                            },
832                        }
833                    }
834                });
835
836            // Sleep for a bit before stopping
837            context.sleep(Duration::from_millis(50)).await;
838
839            // Signal the task
840            context.stop(kill);
841
842            // Ensure both tasks complete
843            let result = join!(before, after);
844            assert!(result.0.is_ok());
845            assert!(result.1.is_ok());
846        });
847    }
848
849    fn test_spawn_ref<R: Runner>(runner: R)
850    where
851        R::Context: Spawner,
852    {
853        runner.start(|mut context| async move {
854            let handle = context.spawn_ref();
855            let result = handle(async move { 42 }).await;
856            assert!(matches!(result, Ok(42)));
857        });
858    }
859
860    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
861    where
862        R::Context: Spawner,
863    {
864        runner.start(|mut context| async move {
865            let handle = context.spawn_ref();
866            let result = handle(async move { 42 }).await;
867            assert!(matches!(result, Ok(42)));
868
869            // Ensure context is consumed
870            let handle = context.spawn_ref();
871            let result = handle(async move { 42 }).await;
872            assert!(matches!(result, Ok(42)));
873        });
874    }
875
876    fn test_spawn_duplicate<R: Runner>(runner: R)
877    where
878        R::Context: Spawner,
879    {
880        runner.start(|mut context| async move {
881            let handle = context.spawn_ref();
882            let result = handle(async move { 42 }).await;
883            assert!(matches!(result, Ok(42)));
884
885            // Ensure context is consumed
886            context.spawn(|_| async move { 42 });
887        });
888    }
889
890    fn test_spawn_blocking<R: Runner>(runner: R)
891    where
892        R::Context: Spawner,
893    {
894        runner.start(|context| async move {
895            let handle = context.spawn_blocking(|| 42);
896            let result = handle.await;
897            assert!(matches!(result, Ok(42)));
898        });
899    }
900
901    fn test_spawn_blocking_abort<R: Runner>(runner: R)
902    where
903        R::Context: Spawner,
904    {
905        runner.start(|context| async move {
906            // Create task
907            let (sender, mut receiver) = oneshot::channel();
908            let handle = context.spawn_blocking(move || {
909                // Wait for abort to be called
910                loop {
911                    if receiver.try_recv().is_ok() {
912                        break;
913                    }
914                }
915
916                // Perform a long-running operation
917                let mut count = 0;
918                loop {
919                    count += 1;
920                    if count >= 100_000_000 {
921                        break;
922                    }
923                }
924                count
925            });
926
927            // Abort the task
928            //
929            // If there was an `.await` prior to sending a message over the oneshot, this test
930            // could deadlock (depending on the runtime implementation) because the blocking task
931            // would never yield (preventing send from being called).
932            handle.abort();
933            sender.send(()).unwrap();
934
935            // Wait for the task to complete
936            assert!(matches!(handle.await, Ok(100_000_000)));
937        });
938    }
939
940    fn test_metrics<R: Runner>(runner: R)
941    where
942        R::Context: Metrics,
943    {
944        runner.start(|context| async move {
945            // Assert label
946            assert_eq!(context.label(), "");
947
948            // Register a metric
949            let counter = Counter::<u64>::default();
950            context.register("test", "test", counter.clone());
951
952            // Increment the counter
953            counter.inc();
954
955            // Encode metrics
956            let buffer = context.encode();
957            assert!(buffer.contains("test_total 1"));
958
959            // Nested context
960            let context = context.with_label("nested");
961            let nested_counter = Counter::<u64>::default();
962            context.register("test", "test", nested_counter.clone());
963
964            // Increment the counter
965            nested_counter.inc();
966
967            // Encode metrics
968            let buffer = context.encode();
969            assert!(buffer.contains("nested_test_total 1"));
970            assert!(buffer.contains("test_total 1"));
971        });
972    }
973
974    fn test_metrics_label<R: Runner>(runner: R)
975    where
976        R::Context: Metrics,
977    {
978        runner.start(|context| async move {
979            context.with_label(METRICS_PREFIX);
980        })
981    }
982
983    fn test_metrics_serve<R, L, Si, St>(runner: R)
984    where
985        R: Runner,
986        R::Context: Spawner + Metrics + Network<L, Si, St> + Clock,
987        L: Listener<Si, St>,
988        Si: Sink,
989        St: Stream,
990    {
991        runner.start(|context| async move {
992            // Register a test metric
993            let counter: Counter<u64> = Counter::default();
994            context.register("test_counter", "Test counter", counter.clone());
995            counter.inc();
996
997            // Define the server address
998            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
999
1000            // Start the metrics server (serves one connection and exits)
1001            context
1002                .with_label("server")
1003                .spawn(move |context| async move {
1004                    metrics::server::serve(context, address).await;
1005                });
1006
1007            // Helper functions to parse HTTP response
1008            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1009                let mut line = Vec::new();
1010                loop {
1011                    let mut byte = [0; 1];
1012                    stream.recv(&mut byte).await?;
1013                    if byte[0] == b'\n' {
1014                        if line.last() == Some(&b'\r') {
1015                            line.pop(); // Remove trailing \r
1016                        }
1017                        break;
1018                    }
1019                    line.push(byte[0]);
1020                }
1021                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1022            }
1023
1024            async fn read_headers<St: Stream>(
1025                stream: &mut St,
1026            ) -> Result<HashMap<String, String>, Error> {
1027                let mut headers = HashMap::new();
1028                loop {
1029                    let line = read_line(stream).await?;
1030                    if line.is_empty() {
1031                        break;
1032                    }
1033                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1034                    if parts.len() == 2 {
1035                        headers.insert(parts[0].to_string(), parts[1].to_string());
1036                    }
1037                }
1038                Ok(headers)
1039            }
1040
1041            async fn read_body<St: Stream>(
1042                stream: &mut St,
1043                content_length: usize,
1044            ) -> Result<String, Error> {
1045                let mut body = vec![0; content_length];
1046                stream.recv(&mut body).await?;
1047                String::from_utf8(body).map_err(|_| Error::ReadFailed)
1048            }
1049
1050            // Simulate a client connecting to the server
1051            let client_handle = context
1052                .with_label("client")
1053                .spawn(move |context| async move {
1054                    let (_, mut stream) = loop {
1055                        match context.dial(address).await {
1056                            Ok((sink, stream)) => break (sink, stream),
1057                            Err(e) => {
1058                                // The client may be polled before the server is ready, that's alright!
1059                                error!(err =?e, "failed to connect");
1060                                context.sleep(Duration::from_millis(10)).await;
1061                            }
1062                        }
1063                    };
1064
1065                    // Read and verify the HTTP status line
1066                    let status_line = read_line(&mut stream).await.unwrap();
1067                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1068
1069                    // Read and parse headers
1070                    let headers = read_headers(&mut stream).await.unwrap();
1071                    let content_length = headers
1072                        .get("Content-Length")
1073                        .unwrap()
1074                        .parse::<usize>()
1075                        .unwrap();
1076
1077                    // Read and verify the body
1078                    let body = read_body(&mut stream, content_length).await.unwrap();
1079                    assert!(body.contains("test_counter_total 1"));
1080                });
1081
1082            // Wait for the client task to complete
1083            client_handle.await.unwrap();
1084        });
1085    }
1086
1087    #[test]
1088    fn test_deterministic_future() {
1089        let runner = deterministic::Runner::default();
1090        test_error_future(runner);
1091    }
1092
1093    #[test]
1094    fn test_deterministic_clock_sleep() {
1095        let executor = deterministic::Runner::default();
1096        test_clock_sleep(executor);
1097    }
1098
1099    #[test]
1100    fn test_deterministic_clock_sleep_until() {
1101        let executor = deterministic::Runner::default();
1102        test_clock_sleep_until(executor);
1103    }
1104
1105    #[test]
1106    fn test_deterministic_root_finishes() {
1107        let executor = deterministic::Runner::default();
1108        test_root_finishes(executor);
1109    }
1110
1111    #[test]
1112    fn test_deterministic_spawn_abort() {
1113        let executor = deterministic::Runner::default();
1114        test_spawn_abort(executor);
1115    }
1116
1117    #[test]
1118    fn test_deterministic_panic_aborts_root() {
1119        let runner = deterministic::Runner::default();
1120        test_panic_aborts_root(runner);
1121    }
1122
1123    #[test]
1124    #[should_panic(expected = "blah")]
1125    fn test_deterministic_panic_aborts_spawn() {
1126        let executor = deterministic::Runner::default();
1127        test_panic_aborts_spawn(executor);
1128    }
1129
1130    #[test]
1131    fn test_deterministic_select() {
1132        let executor = deterministic::Runner::default();
1133        test_select(executor);
1134    }
1135
1136    #[test]
1137    fn test_deterministic_select_loop() {
1138        let executor = deterministic::Runner::default();
1139        test_select_loop(executor);
1140    }
1141
1142    #[test]
1143    fn test_deterministic_storage_operations() {
1144        let executor = deterministic::Runner::default();
1145        test_storage_operations(executor);
1146    }
1147
1148    #[test]
1149    fn test_deterministic_blob_read_write() {
1150        let executor = deterministic::Runner::default();
1151        test_blob_read_write(executor);
1152    }
1153
1154    #[test]
1155    fn test_deterministic_many_partition_read_write() {
1156        let executor = deterministic::Runner::default();
1157        test_many_partition_read_write(executor);
1158    }
1159
1160    #[test]
1161    fn test_deterministic_blob_read_past_length() {
1162        let executor = deterministic::Runner::default();
1163        test_blob_read_past_length(executor);
1164    }
1165
1166    #[test]
1167    fn test_deterministic_blob_clone_and_concurrent_read() {
1168        // Run test
1169        let executor = deterministic::Runner::default();
1170        test_blob_clone_and_concurrent_read(executor);
1171    }
1172
1173    #[test]
1174    fn test_deterministic_shutdown() {
1175        let executor = deterministic::Runner::default();
1176        test_shutdown(executor);
1177    }
1178
1179    #[test]
1180    fn test_deterministic_spawn_ref() {
1181        let executor = deterministic::Runner::default();
1182        test_spawn_ref(executor);
1183    }
1184
1185    #[test]
1186    #[should_panic]
1187    fn test_deterministic_spawn_ref_duplicate() {
1188        let executor = deterministic::Runner::default();
1189        test_spawn_ref_duplicate(executor);
1190    }
1191
1192    #[test]
1193    #[should_panic]
1194    fn test_deterministic_spawn_duplicate() {
1195        let executor = deterministic::Runner::default();
1196        test_spawn_duplicate(executor);
1197    }
1198
1199    #[test]
1200    fn test_deterministic_spawn_blocking() {
1201        let executor = deterministic::Runner::default();
1202        test_spawn_blocking(executor);
1203    }
1204
1205    #[test]
1206    #[should_panic(expected = "blocking task panicked")]
1207    fn test_deterministic_spawn_blocking_panic() {
1208        let executor = deterministic::Runner::default();
1209        executor.start(|context| async move {
1210            let handle = context.spawn_blocking(|| {
1211                panic!("blocking task panicked");
1212            });
1213            handle.await.unwrap();
1214        });
1215    }
1216
1217    #[test]
1218    fn test_deterministic_spawn_blocking_abort() {
1219        let executor = deterministic::Runner::default();
1220        test_spawn_blocking_abort(executor);
1221    }
1222
1223    #[test]
1224    fn test_deterministic_metrics() {
1225        let executor = deterministic::Runner::default();
1226        test_metrics(executor);
1227    }
1228
1229    #[test]
1230    #[should_panic]
1231    fn test_deterministic_metrics_label() {
1232        let executor = deterministic::Runner::default();
1233        test_metrics_label(executor);
1234    }
1235
1236    #[test]
1237    fn test_deterministic_metrics_serve() {
1238        let executor = deterministic::Runner::default();
1239        test_metrics_serve(executor);
1240    }
1241
1242    #[test]
1243    fn test_tokio_error_future() {
1244        let runner = tokio::Runner::default();
1245        test_error_future(runner);
1246    }
1247
1248    #[test]
1249    fn test_tokio_clock_sleep() {
1250        let executor = tokio::Runner::default();
1251        test_clock_sleep(executor);
1252    }
1253
1254    #[test]
1255    fn test_tokio_clock_sleep_until() {
1256        let executor = tokio::Runner::default();
1257        test_clock_sleep_until(executor);
1258    }
1259
1260    #[test]
1261    fn test_tokio_root_finishes() {
1262        let executor = tokio::Runner::default();
1263        test_root_finishes(executor);
1264    }
1265
1266    #[test]
1267    fn test_tokio_spawn_abort() {
1268        let executor = tokio::Runner::default();
1269        test_spawn_abort(executor);
1270    }
1271
1272    #[test]
1273    fn test_tokio_panic_aborts_root() {
1274        let executor = tokio::Runner::default();
1275        test_panic_aborts_root(executor);
1276    }
1277
1278    #[test]
1279    fn test_tokio_panic_aborts_spawn() {
1280        let executor = tokio::Runner::default();
1281        test_panic_aborts_spawn(executor);
1282    }
1283
1284    #[test]
1285    fn test_tokio_select() {
1286        let executor = tokio::Runner::default();
1287        test_select(executor);
1288    }
1289
1290    #[test]
1291    fn test_tokio_select_loop() {
1292        let executor = tokio::Runner::default();
1293        test_select_loop(executor);
1294    }
1295
1296    #[test]
1297    fn test_tokio_storage_operations() {
1298        let executor = tokio::Runner::default();
1299        test_storage_operations(executor);
1300    }
1301
1302    #[test]
1303    fn test_tokio_blob_read_write() {
1304        let executor = tokio::Runner::default();
1305        test_blob_read_write(executor);
1306    }
1307
1308    #[test]
1309    fn test_tokio_many_partition_read_write() {
1310        let executor = tokio::Runner::default();
1311        test_many_partition_read_write(executor);
1312    }
1313
1314    #[test]
1315    fn test_tokio_blob_read_past_length() {
1316        let executor = tokio::Runner::default();
1317        test_blob_read_past_length(executor);
1318    }
1319
1320    #[test]
1321    fn test_tokio_blob_clone_and_concurrent_read() {
1322        // Run test
1323        let executor = tokio::Runner::default();
1324        test_blob_clone_and_concurrent_read(executor);
1325    }
1326
1327    #[test]
1328    fn test_tokio_shutdown() {
1329        let executor = tokio::Runner::default();
1330        test_shutdown(executor);
1331    }
1332
1333    #[test]
1334    fn test_tokio_spawn_ref() {
1335        let executor = tokio::Runner::default();
1336        test_spawn_ref(executor);
1337    }
1338
1339    #[test]
1340    #[should_panic]
1341    fn test_tokio_spawn_ref_duplicate() {
1342        let executor = tokio::Runner::default();
1343        test_spawn_ref_duplicate(executor);
1344    }
1345
1346    #[test]
1347    #[should_panic]
1348    fn test_tokio_spawn_duplicate() {
1349        let executor = tokio::Runner::default();
1350        test_spawn_duplicate(executor);
1351    }
1352
1353    #[test]
1354    fn test_tokio_spawn_blocking() {
1355        let executor = tokio::Runner::default();
1356        test_spawn_blocking(executor);
1357    }
1358
1359    #[test]
1360    fn test_tokio_spawn_blocking_panic() {
1361        let executor = tokio::Runner::default();
1362        executor.start(|context| async move {
1363            let handle = context.spawn_blocking(|| {
1364                panic!("blocking task panicked");
1365            });
1366            let result = handle.await;
1367            assert!(matches!(result, Err(Error::Exited)));
1368        });
1369    }
1370
1371    #[test]
1372    fn test_tokio_spawn_blocking_abort() {
1373        let executor = tokio::Runner::default();
1374        test_spawn_blocking_abort(executor);
1375    }
1376
1377    #[test]
1378    fn test_tokio_metrics() {
1379        let executor = tokio::Runner::default();
1380        test_metrics(executor);
1381    }
1382
1383    #[test]
1384    #[should_panic]
1385    fn test_tokio_metrics_label() {
1386        let executor = tokio::Runner::default();
1387        test_metrics_label(executor);
1388    }
1389
1390    #[test]
1391    fn test_tokio_metrics_serve() {
1392        let executor = tokio::Runner::default();
1393        test_metrics_serve(executor);
1394    }
1395}