commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23    future::Future,
24    net::SocketAddr,
25    time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32    if #[cfg(not(target_arch = "wasm32"))] {
33        pub mod tokio;
34        pub mod benchmarks;
35    }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41    create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44/// Prefix for runtime metrics.
45const METRICS_PREFIX: &str = "runtime";
46
47/// Errors that can occur when interacting with the runtime.
48#[derive(Error, Debug)]
49pub enum Error {
50    #[error("exited")]
51    Exited,
52    #[error("closed")]
53    Closed,
54    #[error("timeout")]
55    Timeout,
56    #[error("bind failed")]
57    BindFailed,
58    #[error("connection failed")]
59    ConnectionFailed,
60    #[error("write failed")]
61    WriteFailed,
62    #[error("read failed")]
63    ReadFailed,
64    #[error("send failed")]
65    SendFailed,
66    #[error("recv failed")]
67    RecvFailed,
68    #[error("partition creation failed: {0}")]
69    PartitionCreationFailed(String),
70    #[error("partition missing: {0}")]
71    PartitionMissing(String),
72    #[error("partition corrupt: {0}")]
73    PartitionCorrupt(String),
74    #[error("blob open failed: {0}/{1} error: {2}")]
75    BlobOpenFailed(String, String, IoError),
76    #[error("blob missing: {0}/{1}")]
77    BlobMissing(String, String),
78    #[error("blob truncate failed: {0}/{1} error: {2}")]
79    BlobTruncateFailed(String, String, IoError),
80    #[error("blob sync failed: {0}/{1} error: {2}")]
81    BlobSyncFailed(String, String, IoError),
82    #[error("blob close failed: {0}/{1} error: {2}")]
83    BlobCloseFailed(String, String, IoError),
84    #[error("blob insufficient length")]
85    BlobInsufficientLength,
86    #[error("offset overflow")]
87    OffsetOverflow,
88}
89
90/// Interface that any task scheduler must implement to start
91/// running tasks.
92pub trait Runner {
93    /// Context defines the environment available to tasks.
94    /// Example of possible services provided by the context include:
95    /// - [Clock] for time-based operations
96    /// - [Network] for network operations
97    /// - [Storage] for storage operations
98    type Context;
99
100    /// Start running a root task.
101    fn start<F, Fut>(self, f: F) -> Fut::Output
102    where
103        F: FnOnce(Self::Context) -> Fut,
104        Fut: Future;
105}
106
107/// Interface that any task scheduler must implement to spawn tasks.
108pub trait Spawner: Clone + Send + Sync + 'static {
109    /// Enqueue a task to be executed.
110    ///
111    /// Unlike a future, a spawned task will start executing immediately (even if the caller
112    /// does not await the handle).
113    ///
114    /// Spawned tasks consume the context used to create them. This ensures that context cannot
115    /// be shared between tasks and that a task's context always comes from somewhere.
116    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117    where
118        F: FnOnce(Self) -> Fut + Send + 'static,
119        Fut: Future<Output = T> + Send + 'static,
120        T: Send + 'static;
121
122    /// Enqueue a task to be executed (without consuming the context).
123    ///
124    /// Unlike a future, a spawned task will start executing immediately (even if the caller
125    /// does not await the handle).
126    ///
127    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
128    /// an actor that already has a reference to context).
129    ///
130    /// # Warning
131    ///
132    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
133    /// to prevent accidental misuse.
134    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135    where
136        F: Future<Output = T> + Send + 'static,
137        T: Send + 'static;
138
139    /// Enqueue a blocking task to be executed.
140    ///
141    /// This method is designed for synchronous, potentially long-running operations that should
142    /// not block the asynchronous event loop. The task starts executing immediately, and the
143    /// returned handle can be awaited to retrieve the result.
144    ///
145    /// # Warning
146    ///
147    /// Blocking tasks cannot be aborted.
148    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149    where
150        F: FnOnce() -> T + Send + 'static,
151        T: Send + 'static;
152
153    /// Signals the runtime to stop execution and that all outstanding tasks
154    /// should perform any required cleanup and exit. This method is idempotent and
155    /// can be called multiple times.
156    ///
157    /// This method does not actually kill any tasks but rather signals to them, using
158    /// the `Signal` returned by `stopped`, that they should exit.
159    fn stop(&self, value: i32);
160
161    /// Returns an instance of a `Signal` that resolves when `stop` is called by
162    /// any task.
163    ///
164    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
165    fn stopped(&self) -> Signal;
166}
167
168/// Interface to register and encode metrics.
169pub trait Metrics: Clone + Send + Sync + 'static {
170    /// Get the current label of the context.
171    fn label(&self) -> String;
172
173    /// Create a new instance of `Metrics` with the given label appended to the end
174    /// of the current `Metrics` label.
175    ///
176    /// This is commonly used to create a nested context for `register`.
177    ///
178    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
179    /// label (reserved for metrics for the runtime).
180    fn with_label(&self, label: &str) -> Self;
181
182    /// Prefix the given label with the current context's label.
183    ///
184    /// Unlike `with_label`, this method does not create a new context.
185    fn scoped_label(&self, label: &str) -> String {
186        let label = if self.label().is_empty() {
187            label.to_string()
188        } else {
189            format!("{}_{}", self.label(), label)
190        };
191        assert!(
192            !label.starts_with(METRICS_PREFIX),
193            "using runtime label is not allowed"
194        );
195        label
196    }
197
198    /// Register a metric with the runtime.
199    ///
200    /// Any registered metric will include (as a prefix) the label of the current context.
201    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203    /// Encode all metrics into a buffer.
204    fn encode(&self) -> String;
205}
206
207/// Interface that any task scheduler must implement to provide
208/// time-based operations.
209///
210/// It is necessary to mock time to provide deterministic execution
211/// of arbitrary tasks.
212pub trait Clock: Clone + Send + Sync + 'static {
213    /// Returns the current time.
214    fn current(&self) -> SystemTime;
215
216    /// Sleep for the given duration.
217    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219    /// Sleep until the given deadline.
220    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
224pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
225
226/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
227pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
228
229/// Interface that any runtime must implement to create
230/// network connections.
231pub trait Network: Clone + Send + Sync + 'static {
232    /// The type of [Listener] that's returned when binding to a socket.
233    /// Accepting a connection returns a [Sink] and [Stream] which are defined
234    /// by the [Listener] and used to send and receive data over the connection.
235    type Listener: Listener;
236
237    /// Bind to the given socket address.
238    fn bind(
239        &self,
240        socket: SocketAddr,
241    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
242
243    /// Dial the given socket address.
244    fn dial(
245        &self,
246        socket: SocketAddr,
247    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
248}
249
250/// Interface that any runtime must implement to handle
251/// incoming network connections.
252pub trait Listener: Sync + Send + 'static {
253    /// The type of [Sink] that's returned when accepting a connection.
254    /// This is used to send data to the remote connection.
255    type Sink: Sink;
256    /// The type of [Stream] that's returned when accepting a connection.
257    /// This is used to receive data from the remote connection.
258    type Stream: Stream;
259
260    /// Accept an incoming connection.
261    fn accept(
262        &mut self,
263    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
264}
265
266/// Interface that any runtime must implement to send
267/// messages over a network connection.
268pub trait Sink: Sync + Send + 'static {
269    /// Send a message to the sink.
270    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
271}
272
273/// Interface that any runtime must implement to receive
274/// messages over a network connection.
275pub trait Stream: Sync + Send + 'static {
276    /// Receive a message from the stream, storing it in the given buffer.
277    /// Reads exactly the number of bytes that fit in the buffer.
278    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
279}
280
281/// Interface to interact with storage.
282///
283///
284/// To support storage implementations that enable concurrent reads and
285/// writes, blobs are responsible for maintaining synchronization.
286///
287/// Storage can be backed by a local filesystem, cloud storage, etc.
288pub trait Storage: Clone + Send + Sync + 'static {
289    /// The readable/writeable storage buffer that can be opened by this Storage.
290    type Blob: Blob;
291
292    /// Open an existing blob in a given partition or create a new one, returning
293    /// the blob and its length.
294    ///
295    /// Multiple instances of the same blob can be opened concurrently, however,
296    /// writing to the same blob concurrently may lead to undefined behavior.
297    fn open(
298        &self,
299        partition: &str,
300        name: &[u8],
301    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
302
303    /// Remove a blob from a given partition.
304    ///
305    /// If no `name` is provided, the entire partition is removed.
306    fn remove(
307        &self,
308        partition: &str,
309        name: Option<&[u8]>,
310    ) -> impl Future<Output = Result<(), Error>> + Send;
311
312    /// Return all blobs in a given partition.
313    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
314}
315
316/// Interface to read and write to a blob.
317///
318/// To support blob implementations that enable concurrent reads and
319/// writes, blobs are responsible for maintaining synchronization.
320///
321/// Cloning a blob is similar to wrapping a single file descriptor in
322/// a lock whereas opening a new blob (of the same name) is similar to
323/// opening a new file descriptor. If multiple blobs are opened with the same
324/// name, they are not expected to coordinate access to underlying storage
325/// and writing to both is undefined behavior.
326#[allow(clippy::len_without_is_empty)]
327pub trait Blob: Clone + Send + Sync + 'static {
328    /// Read from the blob at the given offset.
329    ///
330    /// `read_at` does not return the number of bytes read because it
331    /// only returns once the entire buffer has been filled.
332    fn read_at(
333        &self,
334        buf: &mut [u8],
335        offset: u64,
336    ) -> impl Future<Output = Result<(), Error>> + Send;
337
338    /// Write to the blob at the given offset.
339    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
340
341    /// Truncate the blob to the given length.
342    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
343
344    /// Ensure all pending data is durably persisted.
345    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
346
347    /// Close the blob.
348    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use commonware_macros::select;
355    use futures::channel::oneshot;
356    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
357    use prometheus_client::metrics::counter::Counter;
358    use std::collections::HashMap;
359    use std::panic::{catch_unwind, AssertUnwindSafe};
360    use std::str::FromStr;
361    use std::sync::Mutex;
362    use tracing::{error, Level};
363    use utils::reschedule;
364
365    fn test_error_future<R: Runner>(runner: R) {
366        async fn error_future() -> Result<&'static str, &'static str> {
367            Err("An error occurred")
368        }
369        let result = runner.start(|_| error_future());
370        assert_eq!(result, Err("An error occurred"));
371    }
372
373    fn test_clock_sleep<R: Runner>(runner: R)
374    where
375        R::Context: Spawner + Clock,
376    {
377        runner.start(|context| async move {
378            // Capture initial time
379            let start = context.current();
380            let sleep_duration = Duration::from_millis(10);
381            context.sleep(sleep_duration).await;
382
383            // After run, time should have advanced
384            let end = context.current();
385            assert!(end.duration_since(start).unwrap() >= sleep_duration);
386        });
387    }
388
389    fn test_clock_sleep_until<R: Runner>(runner: R)
390    where
391        R::Context: Spawner + Clock,
392    {
393        runner.start(|context| async move {
394            // Trigger sleep
395            let now = context.current();
396            context.sleep_until(now + Duration::from_millis(100)).await;
397
398            // Ensure slept duration has elapsed
399            let elapsed = now.elapsed().unwrap();
400            assert!(elapsed >= Duration::from_millis(100));
401        });
402    }
403
404    fn test_root_finishes<R: Runner>(runner: R)
405    where
406        R::Context: Spawner,
407    {
408        runner.start(|context| async move {
409            context.spawn(|_| async move {
410                loop {
411                    reschedule().await;
412                }
413            });
414        });
415    }
416
417    fn test_spawn_abort<R: Runner>(runner: R)
418    where
419        R::Context: Spawner,
420    {
421        runner.start(|context| async move {
422            let handle = context.spawn(|_| async move {
423                loop {
424                    reschedule().await;
425                }
426            });
427            handle.abort();
428            assert!(matches!(handle.await, Err(Error::Closed)));
429        });
430    }
431
432    fn test_panic_aborts_root<R: Runner>(runner: R) {
433        let result = catch_unwind(AssertUnwindSafe(|| {
434            runner.start(|_| async move {
435                panic!("blah");
436            });
437        }));
438        result.unwrap_err();
439    }
440
441    fn test_panic_aborts_spawn<R: Runner>(runner: R)
442    where
443        R::Context: Spawner,
444    {
445        let result = runner.start(|context| async move {
446            let result = context.spawn(|_| async move {
447                panic!("blah");
448            });
449            assert!(matches!(result.await, Err(Error::Exited)));
450            Result::<(), Error>::Ok(())
451        });
452
453        // Ensure panic was caught
454        result.unwrap();
455    }
456
457    fn test_select<R: Runner>(runner: R) {
458        runner.start(|_| async move {
459            // Test first branch
460            let output = Mutex::new(0);
461            select! {
462                v1 = ready(1) => {
463                    *output.lock().unwrap() = v1;
464                },
465                v2 = ready(2) => {
466                    *output.lock().unwrap() = v2;
467                },
468            };
469            assert_eq!(*output.lock().unwrap(), 1);
470
471            // Test second branch
472            select! {
473                v1 = std::future::pending::<i32>() => {
474                    *output.lock().unwrap() = v1;
475                },
476                v2 = ready(2) => {
477                    *output.lock().unwrap() = v2;
478                },
479            };
480            assert_eq!(*output.lock().unwrap(), 2);
481        });
482    }
483
484    /// Ensure future fusing works as expected.
485    fn test_select_loop<R: Runner>(runner: R)
486    where
487        R::Context: Clock,
488    {
489        runner.start(|context| async move {
490            // Should hit timeout
491            let (mut sender, mut receiver) = mpsc::unbounded();
492            for _ in 0..2 {
493                select! {
494                    v = receiver.next() => {
495                        panic!("unexpected value: {:?}", v);
496                    },
497                    _ = context.sleep(Duration::from_millis(100)) => {
498                        continue;
499                    },
500                };
501            }
502
503            // Populate channel
504            sender.send(0).await.unwrap();
505            sender.send(1).await.unwrap();
506
507            // Prefer not reading channel without losing messages
508            select! {
509                _ = async {} => {
510                    // Skip reading from channel even though populated
511                },
512                v = receiver.next() => {
513                    panic!("unexpected value: {:?}", v);
514                },
515            };
516
517            // Process messages
518            for i in 0..2 {
519                select! {
520                    _ = context.sleep(Duration::from_millis(100)) => {
521                        panic!("timeout");
522                    },
523                    v = receiver.next() => {
524                        assert_eq!(v.unwrap(), i);
525                    },
526                };
527            }
528        });
529    }
530
531    fn test_storage_operations<R: Runner>(runner: R)
532    where
533        R::Context: Storage,
534    {
535        runner.start(|context| async move {
536            let partition = "test_partition";
537            let name = b"test_blob";
538
539            // Open a new blob
540            let (blob, _) = context
541                .open(partition, name)
542                .await
543                .expect("Failed to open blob");
544
545            // Write data to the blob
546            let data = b"Hello, Storage!";
547            blob.write_at(data, 0)
548                .await
549                .expect("Failed to write to blob");
550
551            // Sync the blob
552            blob.sync().await.expect("Failed to sync blob");
553
554            // Read data from the blob
555            let mut buffer = vec![0u8; data.len()];
556            blob.read_at(&mut buffer, 0)
557                .await
558                .expect("Failed to read from blob");
559            assert_eq!(&buffer, data);
560
561            // Close the blob
562            blob.close().await.expect("Failed to close blob");
563
564            // Scan blobs in the partition
565            let blobs = context
566                .scan(partition)
567                .await
568                .expect("Failed to scan partition");
569            assert!(blobs.contains(&name.to_vec()));
570
571            // Reopen the blob
572            let (blob, len) = context
573                .open(partition, name)
574                .await
575                .expect("Failed to reopen blob");
576            assert_eq!(len, data.len() as u64);
577
578            // Read data part of message back
579            let mut buffer = vec![0u8; 7];
580            blob.read_at(&mut buffer, 7)
581                .await
582                .expect("Failed to read data");
583            assert_eq!(&buffer, b"Storage");
584
585            // Close the blob
586            blob.close().await.expect("Failed to close blob");
587
588            // Remove the blob
589            context
590                .remove(partition, Some(name))
591                .await
592                .expect("Failed to remove blob");
593
594            // Ensure the blob is removed
595            let blobs = context
596                .scan(partition)
597                .await
598                .expect("Failed to scan partition");
599            assert!(!blobs.contains(&name.to_vec()));
600
601            // Remove the partition
602            context
603                .remove(partition, None)
604                .await
605                .expect("Failed to remove partition");
606
607            // Scan the partition
608            let result = context.scan(partition).await;
609            assert!(matches!(result, Err(Error::PartitionMissing(_))));
610        });
611    }
612
613    fn test_blob_read_write<R: Runner>(runner: R)
614    where
615        R::Context: Storage,
616    {
617        runner.start(|context| async move {
618            let partition = "test_partition";
619            let name = b"test_blob_rw";
620
621            // Open a new blob
622            let (blob, _) = context
623                .open(partition, name)
624                .await
625                .expect("Failed to open blob");
626
627            // Write data at different offsets
628            let data1 = b"Hello";
629            let data2 = b"World";
630            blob.write_at(data1, 0)
631                .await
632                .expect("Failed to write data1");
633            blob.write_at(data2, 5)
634                .await
635                .expect("Failed to write data2");
636
637            // Read data back
638            let mut buffer = vec![0u8; 10];
639            blob.read_at(&mut buffer, 0)
640                .await
641                .expect("Failed to read data");
642            assert_eq!(&buffer[..5], data1);
643            assert_eq!(&buffer[5..], data2);
644
645            // Rewrite data without affecting length
646            let data3 = b"Store";
647            blob.write_at(data3, 5)
648                .await
649                .expect("Failed to write data3");
650
651            // Truncate the blob
652            blob.truncate(5).await.expect("Failed to truncate blob");
653            let mut buffer = vec![0u8; 5];
654            blob.read_at(&mut buffer, 0)
655                .await
656                .expect("Failed to read data");
657            assert_eq!(&buffer[..5], data1);
658
659            // Full read after truncation
660            let mut buffer = vec![0u8; 10];
661            let result = blob.read_at(&mut buffer, 0).await;
662            assert!(result.is_err());
663
664            // Close the blob
665            blob.close().await.expect("Failed to close blob");
666        });
667    }
668
669    fn test_many_partition_read_write<R: Runner>(runner: R)
670    where
671        R::Context: Storage,
672    {
673        runner.start(|context| async move {
674            let partitions = ["partition1", "partition2", "partition3"];
675            let name = b"test_blob_rw";
676            let data1 = b"Hello";
677            let data2 = b"World";
678
679            for (additional, partition) in partitions.iter().enumerate() {
680                // Open a new blob
681                let (blob, _) = context
682                    .open(partition, name)
683                    .await
684                    .expect("Failed to open blob");
685
686                // Write data at different offsets
687                blob.write_at(data1, 0)
688                    .await
689                    .expect("Failed to write data1");
690                blob.write_at(data2, 5 + additional as u64)
691                    .await
692                    .expect("Failed to write data2");
693
694                // Close the blob
695                blob.close().await.expect("Failed to close blob");
696            }
697
698            for (additional, partition) in partitions.iter().enumerate() {
699                // Open a new blob
700                let (blob, len) = context
701                    .open(partition, name)
702                    .await
703                    .expect("Failed to open blob");
704                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
705
706                // Read data back
707                let mut buffer = vec![0u8; 10 + additional];
708                blob.read_at(&mut buffer, 0)
709                    .await
710                    .expect("Failed to read data");
711                assert_eq!(&buffer[..5], b"Hello");
712                assert_eq!(&buffer[5 + additional..], b"World");
713
714                // Close the blob
715                blob.close().await.expect("Failed to close blob");
716            }
717        });
718    }
719
720    fn test_blob_read_past_length<R: Runner>(runner: R)
721    where
722        R::Context: Storage,
723    {
724        runner.start(|context| async move {
725            let partition = "test_partition";
726            let name = b"test_blob_rw";
727
728            // Open a new blob
729            let (blob, _) = context
730                .open(partition, name)
731                .await
732                .expect("Failed to open blob");
733
734            // Read data past file length (empty file)
735            let mut buffer = vec![0u8; 10];
736            let result = blob.read_at(&mut buffer, 0).await;
737            assert!(result.is_err());
738
739            // Write data to the blob
740            let data = b"Hello, Storage!";
741            blob.write_at(data, 0)
742                .await
743                .expect("Failed to write to blob");
744
745            // Read data past file length (non-empty file)
746            let mut buffer = vec![0u8; 20];
747            let result = blob.read_at(&mut buffer, 0).await;
748            assert!(result.is_err());
749        })
750    }
751
752    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
753    where
754        R::Context: Spawner + Storage + Metrics,
755    {
756        runner.start(|context| async move {
757            let partition = "test_partition";
758            let name = b"test_blob_rw";
759
760            // Open a new blob
761            let (blob, _) = context
762                .open(partition, name)
763                .await
764                .expect("Failed to open blob");
765
766            // Write data to the blob
767            let data = b"Hello, Storage!";
768            blob.write_at(data, 0)
769                .await
770                .expect("Failed to write to blob");
771
772            // Sync the blob
773            blob.sync().await.expect("Failed to sync blob");
774
775            // Read data from the blob in clone
776            let check1 = context.with_label("check1").spawn({
777                let blob = blob.clone();
778                move |_| async move {
779                    let mut buffer = vec![0u8; data.len()];
780                    blob.read_at(&mut buffer, 0)
781                        .await
782                        .expect("Failed to read from blob");
783                    assert_eq!(&buffer, data);
784                }
785            });
786            let check2 = context.with_label("check2").spawn({
787                let blob = blob.clone();
788                move |_| async move {
789                    let mut buffer = vec![0u8; data.len()];
790                    blob.read_at(&mut buffer, 0)
791                        .await
792                        .expect("Failed to read from blob");
793                    assert_eq!(&buffer, data);
794                }
795            });
796
797            // Wait for both reads to complete
798            let result = join!(check1, check2);
799            assert!(result.0.is_ok());
800            assert!(result.1.is_ok());
801
802            // Read data from the blob
803            let mut buffer = vec![0u8; data.len()];
804            blob.read_at(&mut buffer, 0)
805                .await
806                .expect("Failed to read from blob");
807            assert_eq!(&buffer, data);
808
809            // Close the blob
810            blob.close().await.expect("Failed to close blob");
811
812            // Ensure no blobs still open
813            let buffer = context.encode();
814            assert!(buffer.contains("open_blobs 0"));
815        });
816    }
817
818    fn test_shutdown<R: Runner>(runner: R)
819    where
820        R::Context: Spawner + Metrics + Clock,
821    {
822        let kill = 9;
823        runner.start(|context| async move {
824            // Spawn a task that waits for signal
825            let before = context
826                .with_label("before")
827                .spawn(move |context| async move {
828                    let sig = context.stopped().await;
829                    assert_eq!(sig.unwrap(), kill);
830                });
831
832            // Spawn a task after stop is called
833            let after = context
834                .with_label("after")
835                .spawn(move |context| async move {
836                    // Wait for stop signal
837                    let mut signal = context.stopped();
838                    loop {
839                        select! {
840                            sig = &mut signal => {
841                                // Stopper resolved
842                                assert_eq!(sig.unwrap(), kill);
843                                break;
844                            },
845                            _ = context.sleep(Duration::from_millis(10)) => {
846                                // Continue waiting
847                            },
848                        }
849                    }
850                });
851
852            // Sleep for a bit before stopping
853            context.sleep(Duration::from_millis(50)).await;
854
855            // Signal the task
856            context.stop(kill);
857
858            // Ensure both tasks complete
859            let result = join!(before, after);
860            assert!(result.0.is_ok());
861            assert!(result.1.is_ok());
862        });
863    }
864
865    fn test_spawn_ref<R: Runner>(runner: R)
866    where
867        R::Context: Spawner,
868    {
869        runner.start(|mut context| async move {
870            let handle = context.spawn_ref();
871            let result = handle(async move { 42 }).await;
872            assert!(matches!(result, Ok(42)));
873        });
874    }
875
876    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
877    where
878        R::Context: Spawner,
879    {
880        runner.start(|mut context| async move {
881            let handle = context.spawn_ref();
882            let result = handle(async move { 42 }).await;
883            assert!(matches!(result, Ok(42)));
884
885            // Ensure context is consumed
886            let handle = context.spawn_ref();
887            let result = handle(async move { 42 }).await;
888            assert!(matches!(result, Ok(42)));
889        });
890    }
891
892    fn test_spawn_duplicate<R: Runner>(runner: R)
893    where
894        R::Context: Spawner,
895    {
896        runner.start(|mut context| async move {
897            let handle = context.spawn_ref();
898            let result = handle(async move { 42 }).await;
899            assert!(matches!(result, Ok(42)));
900
901            // Ensure context is consumed
902            context.spawn(|_| async move { 42 });
903        });
904    }
905
906    fn test_spawn_blocking<R: Runner>(runner: R)
907    where
908        R::Context: Spawner,
909    {
910        runner.start(|context| async move {
911            let handle = context.spawn_blocking(|| 42);
912            let result = handle.await;
913            assert!(matches!(result, Ok(42)));
914        });
915    }
916
917    fn test_spawn_blocking_abort<R: Runner>(runner: R)
918    where
919        R::Context: Spawner,
920    {
921        runner.start(|context| async move {
922            // Create task
923            let (sender, mut receiver) = oneshot::channel();
924            let handle = context.spawn_blocking(move || {
925                // Wait for abort to be called
926                loop {
927                    if receiver.try_recv().is_ok() {
928                        break;
929                    }
930                }
931
932                // Perform a long-running operation
933                let mut count = 0;
934                loop {
935                    count += 1;
936                    if count >= 100_000_000 {
937                        break;
938                    }
939                }
940                count
941            });
942
943            // Abort the task
944            //
945            // If there was an `.await` prior to sending a message over the oneshot, this test
946            // could deadlock (depending on the runtime implementation) because the blocking task
947            // would never yield (preventing send from being called).
948            handle.abort();
949            sender.send(()).unwrap();
950
951            // Wait for the task to complete
952            assert!(matches!(handle.await, Ok(100_000_000)));
953        });
954    }
955
956    fn test_metrics<R: Runner>(runner: R)
957    where
958        R::Context: Metrics,
959    {
960        runner.start(|context| async move {
961            // Assert label
962            assert_eq!(context.label(), "");
963
964            // Register a metric
965            let counter = Counter::<u64>::default();
966            context.register("test", "test", counter.clone());
967
968            // Increment the counter
969            counter.inc();
970
971            // Encode metrics
972            let buffer = context.encode();
973            assert!(buffer.contains("test_total 1"));
974
975            // Nested context
976            let context = context.with_label("nested");
977            let nested_counter = Counter::<u64>::default();
978            context.register("test", "test", nested_counter.clone());
979
980            // Increment the counter
981            nested_counter.inc();
982
983            // Encode metrics
984            let buffer = context.encode();
985            assert!(buffer.contains("nested_test_total 1"));
986            assert!(buffer.contains("test_total 1"));
987        });
988    }
989
990    fn test_metrics_label<R: Runner>(runner: R)
991    where
992        R::Context: Metrics,
993    {
994        runner.start(|context| async move {
995            context.with_label(METRICS_PREFIX);
996        })
997    }
998
999    #[test]
1000    fn test_deterministic_future() {
1001        let runner = deterministic::Runner::default();
1002        test_error_future(runner);
1003    }
1004
1005    #[test]
1006    fn test_deterministic_clock_sleep() {
1007        let executor = deterministic::Runner::default();
1008        test_clock_sleep(executor);
1009    }
1010
1011    #[test]
1012    fn test_deterministic_clock_sleep_until() {
1013        let executor = deterministic::Runner::default();
1014        test_clock_sleep_until(executor);
1015    }
1016
1017    #[test]
1018    fn test_deterministic_root_finishes() {
1019        let executor = deterministic::Runner::default();
1020        test_root_finishes(executor);
1021    }
1022
1023    #[test]
1024    fn test_deterministic_spawn_abort() {
1025        let executor = deterministic::Runner::default();
1026        test_spawn_abort(executor);
1027    }
1028
1029    #[test]
1030    fn test_deterministic_panic_aborts_root() {
1031        let runner = deterministic::Runner::default();
1032        test_panic_aborts_root(runner);
1033    }
1034
1035    #[test]
1036    #[should_panic(expected = "blah")]
1037    fn test_deterministic_panic_aborts_spawn() {
1038        let executor = deterministic::Runner::default();
1039        test_panic_aborts_spawn(executor);
1040    }
1041
1042    #[test]
1043    fn test_deterministic_select() {
1044        let executor = deterministic::Runner::default();
1045        test_select(executor);
1046    }
1047
1048    #[test]
1049    fn test_deterministic_select_loop() {
1050        let executor = deterministic::Runner::default();
1051        test_select_loop(executor);
1052    }
1053
1054    #[test]
1055    fn test_deterministic_storage_operations() {
1056        let executor = deterministic::Runner::default();
1057        test_storage_operations(executor);
1058    }
1059
1060    #[test]
1061    fn test_deterministic_blob_read_write() {
1062        let executor = deterministic::Runner::default();
1063        test_blob_read_write(executor);
1064    }
1065
1066    #[test]
1067    fn test_deterministic_many_partition_read_write() {
1068        let executor = deterministic::Runner::default();
1069        test_many_partition_read_write(executor);
1070    }
1071
1072    #[test]
1073    fn test_deterministic_blob_read_past_length() {
1074        let executor = deterministic::Runner::default();
1075        test_blob_read_past_length(executor);
1076    }
1077
1078    #[test]
1079    fn test_deterministic_blob_clone_and_concurrent_read() {
1080        // Run test
1081        let executor = deterministic::Runner::default();
1082        test_blob_clone_and_concurrent_read(executor);
1083    }
1084
1085    #[test]
1086    fn test_deterministic_shutdown() {
1087        let executor = deterministic::Runner::default();
1088        test_shutdown(executor);
1089    }
1090
1091    #[test]
1092    fn test_deterministic_spawn_ref() {
1093        let executor = deterministic::Runner::default();
1094        test_spawn_ref(executor);
1095    }
1096
1097    #[test]
1098    #[should_panic]
1099    fn test_deterministic_spawn_ref_duplicate() {
1100        let executor = deterministic::Runner::default();
1101        test_spawn_ref_duplicate(executor);
1102    }
1103
1104    #[test]
1105    #[should_panic]
1106    fn test_deterministic_spawn_duplicate() {
1107        let executor = deterministic::Runner::default();
1108        test_spawn_duplicate(executor);
1109    }
1110
1111    #[test]
1112    fn test_deterministic_spawn_blocking() {
1113        let executor = deterministic::Runner::default();
1114        test_spawn_blocking(executor);
1115    }
1116
1117    #[test]
1118    #[should_panic(expected = "blocking task panicked")]
1119    fn test_deterministic_spawn_blocking_panic() {
1120        let executor = deterministic::Runner::default();
1121        executor.start(|context| async move {
1122            let handle = context.spawn_blocking(|| {
1123                panic!("blocking task panicked");
1124            });
1125            handle.await.unwrap();
1126        });
1127    }
1128
1129    #[test]
1130    fn test_deterministic_spawn_blocking_abort() {
1131        let executor = deterministic::Runner::default();
1132        test_spawn_blocking_abort(executor);
1133    }
1134
1135    #[test]
1136    fn test_deterministic_metrics() {
1137        let executor = deterministic::Runner::default();
1138        test_metrics(executor);
1139    }
1140
1141    #[test]
1142    #[should_panic]
1143    fn test_deterministic_metrics_label() {
1144        let executor = deterministic::Runner::default();
1145        test_metrics_label(executor);
1146    }
1147
1148    #[test]
1149    fn test_tokio_error_future() {
1150        let runner = tokio::Runner::default();
1151        test_error_future(runner);
1152    }
1153
1154    #[test]
1155    fn test_tokio_clock_sleep() {
1156        let executor = tokio::Runner::default();
1157        test_clock_sleep(executor);
1158    }
1159
1160    #[test]
1161    fn test_tokio_clock_sleep_until() {
1162        let executor = tokio::Runner::default();
1163        test_clock_sleep_until(executor);
1164    }
1165
1166    #[test]
1167    fn test_tokio_root_finishes() {
1168        let executor = tokio::Runner::default();
1169        test_root_finishes(executor);
1170    }
1171
1172    #[test]
1173    fn test_tokio_spawn_abort() {
1174        let executor = tokio::Runner::default();
1175        test_spawn_abort(executor);
1176    }
1177
1178    #[test]
1179    fn test_tokio_panic_aborts_root() {
1180        let executor = tokio::Runner::default();
1181        test_panic_aborts_root(executor);
1182    }
1183
1184    #[test]
1185    fn test_tokio_panic_aborts_spawn() {
1186        let executor = tokio::Runner::default();
1187        test_panic_aborts_spawn(executor);
1188    }
1189
1190    #[test]
1191    fn test_tokio_select() {
1192        let executor = tokio::Runner::default();
1193        test_select(executor);
1194    }
1195
1196    #[test]
1197    fn test_tokio_select_loop() {
1198        let executor = tokio::Runner::default();
1199        test_select_loop(executor);
1200    }
1201
1202    #[test]
1203    fn test_tokio_storage_operations() {
1204        let executor = tokio::Runner::default();
1205        test_storage_operations(executor);
1206    }
1207
1208    #[test]
1209    fn test_tokio_blob_read_write() {
1210        let executor = tokio::Runner::default();
1211        test_blob_read_write(executor);
1212    }
1213
1214    #[test]
1215    fn test_tokio_many_partition_read_write() {
1216        let executor = tokio::Runner::default();
1217        test_many_partition_read_write(executor);
1218    }
1219
1220    #[test]
1221    fn test_tokio_blob_read_past_length() {
1222        let executor = tokio::Runner::default();
1223        test_blob_read_past_length(executor);
1224    }
1225
1226    #[test]
1227    fn test_tokio_blob_clone_and_concurrent_read() {
1228        // Run test
1229        let executor = tokio::Runner::default();
1230        test_blob_clone_and_concurrent_read(executor);
1231    }
1232
1233    #[test]
1234    fn test_tokio_shutdown() {
1235        let executor = tokio::Runner::default();
1236        test_shutdown(executor);
1237    }
1238
1239    #[test]
1240    fn test_tokio_spawn_ref() {
1241        let executor = tokio::Runner::default();
1242        test_spawn_ref(executor);
1243    }
1244
1245    #[test]
1246    #[should_panic]
1247    fn test_tokio_spawn_ref_duplicate() {
1248        let executor = tokio::Runner::default();
1249        test_spawn_ref_duplicate(executor);
1250    }
1251
1252    #[test]
1253    #[should_panic]
1254    fn test_tokio_spawn_duplicate() {
1255        let executor = tokio::Runner::default();
1256        test_spawn_duplicate(executor);
1257    }
1258
1259    #[test]
1260    fn test_tokio_spawn_blocking() {
1261        let executor = tokio::Runner::default();
1262        test_spawn_blocking(executor);
1263    }
1264
1265    #[test]
1266    fn test_tokio_spawn_blocking_panic() {
1267        let executor = tokio::Runner::default();
1268        executor.start(|context| async move {
1269            let handle = context.spawn_blocking(|| {
1270                panic!("blocking task panicked");
1271            });
1272            let result = handle.await;
1273            assert!(matches!(result, Err(Error::Exited)));
1274        });
1275    }
1276
1277    #[test]
1278    fn test_tokio_spawn_blocking_abort() {
1279        let executor = tokio::Runner::default();
1280        test_spawn_blocking_abort(executor);
1281    }
1282
1283    #[test]
1284    fn test_tokio_metrics() {
1285        let executor = tokio::Runner::default();
1286        test_metrics(executor);
1287    }
1288
1289    #[test]
1290    #[should_panic]
1291    fn test_tokio_metrics_label() {
1292        let executor = tokio::Runner::default();
1293        test_metrics_label(executor);
1294    }
1295
1296    #[test]
1297    fn test_tokio_telemetry() {
1298        let executor = tokio::Runner::default();
1299        executor.start(|context| async move {
1300            // Define the server address
1301            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1302
1303            // Configure telemetry
1304            tokio::telemetry::init(
1305                context.with_label("metrics"),
1306                tokio::telemetry::Logging {
1307                    level: Level::INFO,
1308                    json: false,
1309                },
1310                Some(address),
1311                None,
1312            );
1313
1314            // Register a test metric
1315            let counter: Counter<u64> = Counter::default();
1316            context.register("test_counter", "Test counter", counter.clone());
1317            counter.inc();
1318
1319            // Helper functions to parse HTTP response
1320            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1321                let mut line = Vec::new();
1322                loop {
1323                    let mut byte = [0; 1];
1324                    stream.recv(&mut byte).await?;
1325                    if byte[0] == b'\n' {
1326                        if line.last() == Some(&b'\r') {
1327                            line.pop(); // Remove trailing \r
1328                        }
1329                        break;
1330                    }
1331                    line.push(byte[0]);
1332                }
1333                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1334            }
1335
1336            async fn read_headers<St: Stream>(
1337                stream: &mut St,
1338            ) -> Result<HashMap<String, String>, Error> {
1339                let mut headers = HashMap::new();
1340                loop {
1341                    let line = read_line(stream).await?;
1342                    if line.is_empty() {
1343                        break;
1344                    }
1345                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1346                    if parts.len() == 2 {
1347                        headers.insert(parts[0].to_string(), parts[1].to_string());
1348                    }
1349                }
1350                Ok(headers)
1351            }
1352
1353            async fn read_body<St: Stream>(
1354                stream: &mut St,
1355                content_length: usize,
1356            ) -> Result<String, Error> {
1357                let mut body = vec![0; content_length];
1358                stream.recv(&mut body).await?;
1359                String::from_utf8(body).map_err(|_| Error::ReadFailed)
1360            }
1361
1362            // Simulate a client connecting to the server
1363            let client_handle = context
1364                .with_label("client")
1365                .spawn(move |context| async move {
1366                    let (mut sink, mut stream) = loop {
1367                        match context.dial(address).await {
1368                            Ok((sink, stream)) => break (sink, stream),
1369                            Err(e) => {
1370                                // The client may be polled before the server is ready, that's alright!
1371                                error!(err =?e, "failed to connect");
1372                                context.sleep(Duration::from_millis(10)).await;
1373                            }
1374                        }
1375                    };
1376
1377                    // Send a GET request to the server
1378                    let request = format!(
1379                        "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1380                        address
1381                    );
1382                    sink.send(request.as_bytes()).await.unwrap();
1383
1384                    // Read and verify the HTTP status line
1385                    let status_line = read_line(&mut stream).await.unwrap();
1386                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1387
1388                    // Read and parse headers
1389                    let headers = read_headers(&mut stream).await.unwrap();
1390                    println!("Headers: {:?}", headers);
1391                    let content_length = headers
1392                        .get("content-length")
1393                        .unwrap()
1394                        .parse::<usize>()
1395                        .unwrap();
1396
1397                    // Read and verify the body
1398                    let body = read_body(&mut stream, content_length).await.unwrap();
1399                    assert!(body.contains("test_counter_total 1"));
1400                });
1401
1402            // Wait for the client task to complete
1403            client_handle.await.unwrap();
1404        });
1405    }
1406}