commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use prometheus_client::registry::Metric;
21use std::{
22    future::Future,
23    net::SocketAddr,
24    time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31    if #[cfg(not(target_arch = "wasm32"))] {
32        pub mod tokio;
33        pub mod benchmarks;
34    }
35}
36mod storage;
37pub mod telemetry;
38mod utils;
39pub use utils::{create_pool, reschedule, Handle, Signal, Signaler};
40
41/// Prefix for runtime metrics.
42const METRICS_PREFIX: &str = "runtime";
43
44/// Errors that can occur when interacting with the runtime.
45#[derive(Error, Debug, PartialEq)]
46pub enum Error {
47    #[error("exited")]
48    Exited,
49    #[error("closed")]
50    Closed,
51    #[error("timeout")]
52    Timeout,
53    #[error("bind failed")]
54    BindFailed,
55    #[error("connection failed")]
56    ConnectionFailed,
57    #[error("write failed")]
58    WriteFailed,
59    #[error("read failed")]
60    ReadFailed,
61    #[error("send failed")]
62    SendFailed,
63    #[error("recv failed")]
64    RecvFailed,
65    #[error("partition creation failed: {0}")]
66    PartitionCreationFailed(String),
67    #[error("partition missing: {0}")]
68    PartitionMissing(String),
69    #[error("partition corrupt: {0}")]
70    PartitionCorrupt(String),
71    #[error("blob open failed: {0}/{1}")]
72    BlobOpenFailed(String, String),
73    #[error("blob missing: {0}/{1}")]
74    BlobMissing(String, String),
75    #[error("blob truncate failed: {0}/{1}")]
76    BlobTruncateFailed(String, String),
77    #[error("blob sync failed: {0}/{1}")]
78    BlobSyncFailed(String, String),
79    #[error("blob close failed: {0}/{1}")]
80    BlobCloseFailed(String, String),
81    #[error("blob insufficient length")]
82    BlobInsufficientLength,
83    #[error("offset overflow")]
84    OffsetOverflow,
85}
86
87/// Interface that any task scheduler must implement to start
88/// running tasks.
89pub trait Runner {
90    /// Start running a root task.
91    ///
92    /// The root task does not create the initial context because it can be useful to have a reference
93    /// to context before starting task execution.
94    fn start<F>(self, f: F) -> F::Output
95    where
96        F: Future;
97}
98
99/// Interface that any task scheduler must implement to spawn tasks.
100pub trait Spawner: Clone + Send + Sync + 'static {
101    /// Enqueue a task to be executed.
102    ///
103    /// Unlike a future, a spawned task will start executing immediately (even if the caller
104    /// does not await the handle).
105    ///
106    /// Spawned tasks consume the context used to create them. This ensures that context cannot
107    /// be shared between tasks and that a task's context always comes from somewhere.
108    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
109    where
110        F: FnOnce(Self) -> Fut + Send + 'static,
111        Fut: Future<Output = T> + Send + 'static,
112        T: Send + 'static;
113
114    /// Enqueue a task to be executed (without consuming the context).
115    ///
116    /// Unlike a future, a spawned task will start executing immediately (even if the caller
117    /// does not await the handle).
118    ///
119    /// In some cases, it may be useful to spawn a task without consuming the context (e.g. starting
120    /// an actor that already has a reference to context).
121    ///
122    /// # Warning
123    ///
124    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
125    /// to prevent accidental misuse.
126    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
127    where
128        F: Future<Output = T> + Send + 'static,
129        T: Send + 'static;
130
131    /// Enqueue a blocking task to be executed.
132    ///
133    /// This method is designed for synchronous, potentially long-running operations that should
134    /// not block the asynchronous event loop. The task starts executing immediately, and the
135    /// returned handle can be awaited to retrieve the result.
136    ///
137    /// # Warning
138    ///
139    /// Blocking tasks cannot be aborted.
140    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
141    where
142        F: FnOnce() -> T + Send + 'static,
143        T: Send + 'static;
144
145    /// Signals the runtime to stop execution and that all outstanding tasks
146    /// should perform any required cleanup and exit. This method is idempotent and
147    /// can be called multiple times.
148    ///
149    /// This method does not actually kill any tasks but rather signals to them, using
150    /// the `Signal` returned by `stopped`, that they should exit.
151    fn stop(&self, value: i32);
152
153    /// Returns an instance of a `Signal` that resolves when `stop` is called by
154    /// any task.
155    ///
156    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
157    fn stopped(&self) -> Signal;
158}
159
160/// Interface to register and encode metrics.
161pub trait Metrics: Clone + Send + Sync + 'static {
162    /// Get the current label of the context.
163    fn label(&self) -> String;
164
165    /// Create a new instance of `Metrics` with the given label appended to the end
166    /// of the current `Metrics` label.
167    ///
168    /// This is commonly used to create a nested context for `register`.
169    ///
170    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
171    /// label (reserved for metrics for the runtime).
172    fn with_label(&self, label: &str) -> Self;
173
174    /// Prefix the given label with the current context's label.
175    ///
176    /// Unlike `with_label`, this method does not create a new context.
177    fn scoped_label(&self, label: &str) -> String {
178        let label = if self.label().is_empty() {
179            label.to_string()
180        } else {
181            format!("{}_{}", self.label(), label)
182        };
183        assert!(
184            !label.starts_with(METRICS_PREFIX),
185            "using runtime label is not allowed"
186        );
187        label
188    }
189
190    /// Register a metric with the runtime.
191    ///
192    /// Any registered metric will include (as a prefix) the label of the current context.
193    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
194
195    /// Encode all metrics into a buffer.
196    fn encode(&self) -> String;
197}
198
199/// Interface that any task scheduler must implement to provide
200/// time-based operations.
201///
202/// It is necessary to mock time to provide deterministic execution
203/// of arbitrary tasks.
204pub trait Clock: Clone + Send + Sync + 'static {
205    /// Returns the current time.
206    fn current(&self) -> SystemTime;
207
208    /// Sleep for the given duration.
209    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
210
211    /// Sleep until the given deadline.
212    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
213}
214
215/// Interface that any runtime must implement to create
216/// network connections.
217pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
218where
219    L: Listener<Si, St>,
220    Si: Sink,
221    St: Stream,
222{
223    /// Bind to the given socket address.
224    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
225
226    /// Dial the given socket address.
227    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
228}
229
230/// Interface that any runtime must implement to handle
231/// incoming network connections.
232pub trait Listener<Si, St>: Sync + Send + 'static
233where
234    Si: Sink,
235    St: Stream,
236{
237    /// Accept an incoming connection.
238    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
239}
240
241/// Interface that any runtime must implement to send
242/// messages over a network connection.
243pub trait Sink: Sync + Send + 'static {
244    /// Send a message to the sink.
245    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
246}
247
248/// Interface that any runtime must implement to receive
249/// messages over a network connection.
250pub trait Stream: Sync + Send + 'static {
251    /// Receive a message from the stream, storing it in the given buffer.
252    /// Reads exactly the number of bytes that fit in the buffer.
253    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256/// Interface to interact with storage.
257///
258///
259/// To support storage implementations that enable concurrent reads and
260/// writes, blobs are responsible for maintaining synchronization.
261///
262/// Storage can be backed by a local filesystem, cloud storage, etc.
263pub trait Storage: Clone + Send + Sync + 'static {
264    /// The readable/writeable storage buffer that can be opened by this Storage.
265    type Blob: Blob;
266
267    /// Open an existing blob in a given partition or create a new one.
268    ///
269    /// Multiple instances of the same blob can be opened concurrently, however,
270    /// writing to the same blob concurrently may lead to undefined behavior.
271    fn open(
272        &self,
273        partition: &str,
274        name: &[u8],
275    ) -> impl Future<Output = Result<Self::Blob, Error>> + Send;
276
277    /// Remove a blob from a given partition.
278    ///
279    /// If no `name` is provided, the entire partition is removed.
280    fn remove(
281        &self,
282        partition: &str,
283        name: Option<&[u8]>,
284    ) -> impl Future<Output = Result<(), Error>> + Send;
285
286    /// Return all blobs in a given partition.
287    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
288}
289
290/// Interface to read and write to a blob.
291///
292/// To support blob implementations that enable concurrent reads and
293/// writes, blobs are responsible for maintaining synchronization.
294///
295/// Cloning a blob is similar to wrapping a single file descriptor in
296/// a lock whereas opening a new blob (of the same name) is similar to
297/// opening a new file descriptor. If multiple blobs are opened with the same
298/// name, they are not expected to coordinate access to underlying storage
299/// and writing to both is undefined behavior.
300#[allow(clippy::len_without_is_empty)]
301pub trait Blob: Clone + Send + Sync + 'static {
302    /// Get the length of the blob.
303    fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
304
305    /// Read from the blob at the given offset.
306    ///
307    /// `read_at` does not return the number of bytes read because it
308    /// only returns once the entire buffer has been filled.
309    fn read_at(
310        &self,
311        buf: &mut [u8],
312        offset: u64,
313    ) -> impl Future<Output = Result<(), Error>> + Send;
314
315    /// Write to the blob at the given offset.
316    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
317
318    /// Truncate the blob to the given length.
319    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
320
321    /// Ensure all pending data is durably persisted.
322    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
323
324    /// Close the blob.
325    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331    use commonware_macros::select;
332    use futures::channel::oneshot;
333    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
334    use prometheus_client::metrics::counter::Counter;
335    use std::collections::HashMap;
336    use std::panic::{catch_unwind, AssertUnwindSafe};
337    use std::str::FromStr;
338    use std::sync::Mutex;
339    use telemetry::metrics;
340    use tracing::error;
341    use utils::reschedule;
342
343    fn test_error_future(runner: impl Runner) {
344        async fn error_future() -> Result<&'static str, &'static str> {
345            Err("An error occurred")
346        }
347        let result = runner.start(error_future());
348        assert_eq!(result, Err("An error occurred"));
349    }
350
351    fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
352        runner.start(async move {
353            // Capture initial time
354            let start = context.current();
355            let sleep_duration = Duration::from_millis(10);
356            context.sleep(sleep_duration).await;
357
358            // After run, time should have advanced
359            let end = context.current();
360            assert!(end.duration_since(start).unwrap() >= sleep_duration);
361        });
362    }
363
364    fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
365        runner.start(async move {
366            // Trigger sleep
367            let now = context.current();
368            context.sleep_until(now + Duration::from_millis(100)).await;
369
370            // Ensure slept duration has elapsed
371            let elapsed = now.elapsed().unwrap();
372            assert!(elapsed >= Duration::from_millis(100));
373        });
374    }
375
376    fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
377        runner.start(async move {
378            context.spawn(|_| async move {
379                loop {
380                    reschedule().await;
381                }
382            });
383        });
384    }
385
386    fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
387        runner.start(async move {
388            let handle = context.spawn(|_| async move {
389                loop {
390                    reschedule().await;
391                }
392            });
393            handle.abort();
394            assert_eq!(handle.await, Err(Error::Closed));
395        });
396    }
397
398    fn test_panic_aborts_root(runner: impl Runner) {
399        let result = catch_unwind(AssertUnwindSafe(|| {
400            runner.start(async move {
401                panic!("blah");
402            });
403        }));
404        result.unwrap_err();
405    }
406
407    fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
408        let result = runner.start(async move {
409            let result = context.spawn(|_| async move {
410                panic!("blah");
411            });
412            assert_eq!(result.await, Err(Error::Exited));
413            Result::<(), Error>::Ok(())
414        });
415
416        // Ensure panic was caught
417        result.unwrap();
418    }
419
420    fn test_select(runner: impl Runner) {
421        runner.start(async move {
422            // Test first branch
423            let output = Mutex::new(0);
424            select! {
425                v1 = ready(1) => {
426                    *output.lock().unwrap() = v1;
427                },
428                v2 = ready(2) => {
429                    *output.lock().unwrap() = v2;
430                },
431            };
432            assert_eq!(*output.lock().unwrap(), 1);
433
434            // Test second branch
435            select! {
436                v1 = std::future::pending::<i32>() => {
437                    *output.lock().unwrap() = v1;
438                },
439                v2 = ready(2) => {
440                    *output.lock().unwrap() = v2;
441                },
442            };
443            assert_eq!(*output.lock().unwrap(), 2);
444        });
445    }
446
447    /// Ensure future fusing works as expected.
448    fn test_select_loop(runner: impl Runner, context: impl Clock) {
449        runner.start(async move {
450            // Should hit timeout
451            let (mut sender, mut receiver) = mpsc::unbounded();
452            for _ in 0..2 {
453                select! {
454                    v = receiver.next() => {
455                        panic!("unexpected value: {:?}", v);
456                    },
457                    _ = context.sleep(Duration::from_millis(100)) => {
458                        continue;
459                    },
460                };
461            }
462
463            // Populate channel
464            sender.send(0).await.unwrap();
465            sender.send(1).await.unwrap();
466
467            // Prefer not reading channel without losing messages
468            select! {
469                _ = async {} => {
470                    // Skip reading from channel even though populated
471                },
472                v = receiver.next() => {
473                    panic!("unexpected value: {:?}", v);
474                },
475            };
476
477            // Process messages
478            for i in 0..2 {
479                select! {
480                    _ = context.sleep(Duration::from_millis(100)) => {
481                        panic!("timeout");
482                    },
483                    v = receiver.next() => {
484                        assert_eq!(v.unwrap(), i);
485                    },
486                };
487            }
488        });
489    }
490
491    fn test_storage_operations(runner: impl Runner, context: impl Spawner + Storage) {
492        runner.start(async move {
493            let partition = "test_partition";
494            let name = b"test_blob";
495
496            // Open a new blob
497            let blob = context
498                .open(partition, name)
499                .await
500                .expect("Failed to open blob");
501
502            // Write data to the blob
503            let data = b"Hello, Storage!";
504            blob.write_at(data, 0)
505                .await
506                .expect("Failed to write to blob");
507
508            // Sync the blob
509            blob.sync().await.expect("Failed to sync blob");
510
511            // Read data from the blob
512            let mut buffer = vec![0u8; data.len()];
513            blob.read_at(&mut buffer, 0)
514                .await
515                .expect("Failed to read from blob");
516            assert_eq!(&buffer, data);
517
518            // Get blob length
519            let length = blob.len().await.expect("Failed to get blob length");
520            assert_eq!(length, data.len() as u64);
521
522            // Close the blob
523            blob.close().await.expect("Failed to close blob");
524
525            // Scan blobs in the partition
526            let blobs = context
527                .scan(partition)
528                .await
529                .expect("Failed to scan partition");
530            assert!(blobs.contains(&name.to_vec()));
531
532            // Reopen the blob
533            let blob = context
534                .open(partition, name)
535                .await
536                .expect("Failed to reopen blob");
537
538            // Read data part of message back
539            let mut buffer = vec![0u8; 7];
540            blob.read_at(&mut buffer, 7)
541                .await
542                .expect("Failed to read data");
543            assert_eq!(&buffer, b"Storage");
544
545            // Close the blob
546            blob.close().await.expect("Failed to close blob");
547
548            // Remove the blob
549            context
550                .remove(partition, Some(name))
551                .await
552                .expect("Failed to remove blob");
553
554            // Ensure the blob is removed
555            let blobs = context
556                .scan(partition)
557                .await
558                .expect("Failed to scan partition");
559            assert!(!blobs.contains(&name.to_vec()));
560
561            // Remove the partition
562            context
563                .remove(partition, None)
564                .await
565                .expect("Failed to remove partition");
566
567            // Scan the partition
568            let result = context.scan(partition).await;
569            assert!(matches!(result, Err(Error::PartitionMissing(_))));
570        });
571    }
572
573    fn test_blob_read_write(runner: impl Runner, context: impl Spawner + Storage) {
574        runner.start(async move {
575            let partition = "test_partition";
576            let name = b"test_blob_rw";
577
578            // Open a new blob
579            let blob = context
580                .open(partition, name)
581                .await
582                .expect("Failed to open blob");
583
584            // Write data at different offsets
585            let data1 = b"Hello";
586            let data2 = b"World";
587            blob.write_at(data1, 0)
588                .await
589                .expect("Failed to write data1");
590            blob.write_at(data2, 5)
591                .await
592                .expect("Failed to write data2");
593
594            // Assert that length tracks pending data
595            let length = blob.len().await.expect("Failed to get blob length");
596            assert_eq!(length, 10);
597
598            // Read data back
599            let mut buffer = vec![0u8; 10];
600            blob.read_at(&mut buffer, 0)
601                .await
602                .expect("Failed to read data");
603            assert_eq!(&buffer[..5], data1);
604            assert_eq!(&buffer[5..], data2);
605
606            // Rewrite data without affecting length
607            let data3 = b"Store";
608            blob.write_at(data3, 5)
609                .await
610                .expect("Failed to write data3");
611            let length = blob.len().await.expect("Failed to get blob length");
612            assert_eq!(length, 10);
613
614            // Truncate the blob
615            blob.truncate(5).await.expect("Failed to truncate blob");
616            let length = blob.len().await.expect("Failed to get blob length");
617            assert_eq!(length, 5);
618            let mut buffer = vec![0u8; 5];
619            blob.read_at(&mut buffer, 0)
620                .await
621                .expect("Failed to read data");
622            assert_eq!(&buffer[..5], data1);
623
624            // Full read after truncation
625            let mut buffer = vec![0u8; 10];
626            let result = blob.read_at(&mut buffer, 0).await;
627            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
628
629            // Close the blob
630            blob.close().await.expect("Failed to close blob");
631        });
632    }
633
634    fn test_many_partition_read_write(runner: impl Runner, context: impl Spawner + Storage) {
635        runner.start(async move {
636            let partitions = ["partition1", "partition2", "partition3"];
637            let name = b"test_blob_rw";
638
639            for (additional, partition) in partitions.iter().enumerate() {
640                // Open a new blob
641                let blob = context
642                    .open(partition, name)
643                    .await
644                    .expect("Failed to open blob");
645
646                // Write data at different offsets
647                let data1 = b"Hello";
648                let data2 = b"World";
649                blob.write_at(data1, 0)
650                    .await
651                    .expect("Failed to write data1");
652                blob.write_at(data2, 5 + additional as u64)
653                    .await
654                    .expect("Failed to write data2");
655
656                // Close the blob
657                blob.close().await.expect("Failed to close blob");
658            }
659
660            for (additional, partition) in partitions.iter().enumerate() {
661                // Open a new blob
662                let blob = context
663                    .open(partition, name)
664                    .await
665                    .expect("Failed to open blob");
666
667                // Read data back
668                let mut buffer = vec![0u8; 10 + additional];
669                blob.read_at(&mut buffer, 0)
670                    .await
671                    .expect("Failed to read data");
672                assert_eq!(&buffer[..5], b"Hello");
673                assert_eq!(&buffer[5 + additional..], b"World");
674
675                // Close the blob
676                blob.close().await.expect("Failed to close blob");
677            }
678        });
679    }
680
681    fn test_blob_read_past_length(runner: impl Runner, context: impl Spawner + Storage) {
682        runner.start(async move {
683            let partition = "test_partition";
684            let name = b"test_blob_rw";
685
686            // Open a new blob
687            let blob = context
688                .open(partition, name)
689                .await
690                .expect("Failed to open blob");
691
692            // Read data past file length (empty file)
693            let mut buffer = vec![0u8; 10];
694            let result = blob.read_at(&mut buffer, 0).await;
695            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
696
697            // Write data to the blob
698            let data = b"Hello, Storage!";
699            blob.write_at(data, 0)
700                .await
701                .expect("Failed to write to blob");
702
703            // Read data past file length (non-empty file)
704            let mut buffer = vec![0u8; 20];
705            let result = blob.read_at(&mut buffer, 0).await;
706            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
707        })
708    }
709
710    fn test_blob_clone_and_concurrent_read(
711        runner: impl Runner,
712        context: impl Spawner + Storage + Metrics,
713    ) {
714        runner.start(async move {
715            let partition = "test_partition";
716            let name = b"test_blob_rw";
717
718            // Open a new blob
719            let blob = context
720                .open(partition, name)
721                .await
722                .expect("Failed to open blob");
723
724            // Write data to the blob
725            let data = b"Hello, Storage!";
726            blob.write_at(data, 0)
727                .await
728                .expect("Failed to write to blob");
729
730            // Sync the blob
731            blob.sync().await.expect("Failed to sync blob");
732
733            // Read data from the blob in clone
734            let check1 = context.with_label("check1").spawn({
735                let blob = blob.clone();
736                move |_| async move {
737                    let mut buffer = vec![0u8; data.len()];
738                    blob.read_at(&mut buffer, 0)
739                        .await
740                        .expect("Failed to read from blob");
741                    assert_eq!(&buffer, data);
742                }
743            });
744            let check2 = context.with_label("check2").spawn({
745                let blob = blob.clone();
746                move |_| async move {
747                    let mut buffer = vec![0u8; data.len()];
748                    blob.read_at(&mut buffer, 0)
749                        .await
750                        .expect("Failed to read from blob");
751                    assert_eq!(&buffer, data);
752                }
753            });
754
755            // Wait for both reads to complete
756            let result = join!(check1, check2);
757            assert!(result.0.is_ok());
758            assert!(result.1.is_ok());
759
760            // Read data from the blob
761            let mut buffer = vec![0u8; data.len()];
762            blob.read_at(&mut buffer, 0)
763                .await
764                .expect("Failed to read from blob");
765            assert_eq!(&buffer, data);
766
767            // Get blob length
768            let length = blob.len().await.expect("Failed to get blob length");
769            assert_eq!(length, data.len() as u64);
770
771            // Close the blob
772            blob.close().await.expect("Failed to close blob");
773        });
774    }
775
776    fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
777        let kill = 9;
778        runner.start(async move {
779            // Spawn a task that waits for signal
780            let before = context
781                .with_label("before")
782                .spawn(move |context| async move {
783                    let sig = context.stopped().await;
784                    assert_eq!(sig.unwrap(), kill);
785                });
786
787            // Spawn a task after stop is called
788            let after = context
789                .with_label("after")
790                .spawn(move |context| async move {
791                    // Wait for stop signal
792                    let mut signal = context.stopped();
793                    loop {
794                        select! {
795                            sig = &mut signal => {
796                                // Stopper resolved
797                                assert_eq!(sig.unwrap(), kill);
798                                break;
799                            },
800                            _ = context.sleep(Duration::from_millis(10)) => {
801                                // Continue waiting
802                            },
803                        }
804                    }
805                });
806
807            // Sleep for a bit before stopping
808            context.sleep(Duration::from_millis(50)).await;
809
810            // Signal the task
811            context.stop(kill);
812
813            // Ensure both tasks complete
814            let result = join!(before, after);
815            assert!(result.0.is_ok());
816            assert!(result.1.is_ok());
817        });
818    }
819
820    fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
821        runner.start(async move {
822            let handle = context.spawn_ref();
823            let result = handle(async move { 42 }).await;
824            assert_eq!(result, Ok(42));
825        });
826    }
827
828    fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
829        runner.start(async move {
830            let handle = context.spawn_ref();
831            let result = handle(async move { 42 }).await;
832            assert_eq!(result, Ok(42));
833
834            // Ensure context is consumed
835            let handle = context.spawn_ref();
836            let result = handle(async move { 42 }).await;
837            assert_eq!(result, Ok(42));
838        });
839    }
840
841    fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
842        runner.start(async move {
843            let handle = context.spawn_ref();
844            let result = handle(async move { 42 }).await;
845            assert_eq!(result, Ok(42));
846
847            // Ensure context is consumed
848            context.spawn(|_| async move { 42 });
849        });
850    }
851
852    fn test_spawn_blocking(runner: impl Runner, context: impl Spawner) {
853        runner.start(async move {
854            let handle = context.spawn_blocking(|| 42);
855            let result = handle.await;
856            assert_eq!(result, Ok(42));
857        });
858    }
859
860    fn test_spawn_blocking_abort(runner: impl Runner, context: impl Spawner) {
861        runner.start(async move {
862            // Create task
863            let (sender, mut receiver) = oneshot::channel();
864            let handle = context.spawn_blocking(move || {
865                // Wait for abort to be called
866                loop {
867                    if receiver.try_recv().is_ok() {
868                        break;
869                    }
870                }
871
872                // Perform a long-running operation
873                let mut count = 0;
874                loop {
875                    count += 1;
876                    if count >= 100_000_000 {
877                        break;
878                    }
879                }
880                count
881            });
882
883            // Abort the task
884            //
885            // If there was an `.await` prior to sending a message over the oneshot, this test
886            // could deadlock (depending on the runtime implementation) because the blocking task
887            // would never yield (preventing send from being called).
888            handle.abort();
889            sender.send(()).unwrap();
890
891            // Wait for the task to complete
892            assert_eq!(handle.await, Ok(100_000_000));
893        });
894    }
895
896    fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
897        runner.start(async move {
898            // Assert label
899            assert_eq!(context.label(), "");
900
901            // Register a metric
902            let counter = Counter::<u64>::default();
903            context.register("test", "test", counter.clone());
904
905            // Increment the counter
906            counter.inc();
907
908            // Encode metrics
909            let buffer = context.encode();
910            assert!(buffer.contains("test_total 1"));
911
912            // Nested context
913            let context = context.with_label("nested");
914            let nested_counter = Counter::<u64>::default();
915            context.register("test", "test", nested_counter.clone());
916
917            // Increment the counter
918            nested_counter.inc();
919
920            // Encode metrics
921            let buffer = context.encode();
922            assert!(buffer.contains("nested_test_total 1"));
923            assert!(buffer.contains("test_total 1"));
924        });
925    }
926
927    fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
928        runner.start(async move {
929            context.with_label(METRICS_PREFIX);
930        })
931    }
932
933    fn test_metrics_serve<L, Si, St>(
934        runner: impl Runner,
935        context: impl Clock + Spawner + Metrics + Network<L, Si, St>,
936    ) where
937        L: Listener<Si, St>,
938        Si: Sink,
939        St: Stream,
940    {
941        runner.start(async move {
942            // Register a test metric
943            let counter: Counter<u64> = Counter::default();
944            context.register("test_counter", "Test counter", counter.clone());
945            counter.inc();
946
947            // Define the server address
948            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
949
950            // Start the metrics server (serves one connection and exits)
951            context
952                .with_label("server")
953                .spawn(move |context| async move {
954                    metrics::server::serve(context, address).await;
955                });
956
957            // Helper functions to parse HTTP response
958            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
959                let mut line = Vec::new();
960                loop {
961                    let mut byte = [0; 1];
962                    stream.recv(&mut byte).await?;
963                    if byte[0] == b'\n' {
964                        if line.last() == Some(&b'\r') {
965                            line.pop(); // Remove trailing \r
966                        }
967                        break;
968                    }
969                    line.push(byte[0]);
970                }
971                String::from_utf8(line).map_err(|_| Error::ReadFailed)
972            }
973
974            async fn read_headers<St: Stream>(
975                stream: &mut St,
976            ) -> Result<HashMap<String, String>, Error> {
977                let mut headers = HashMap::new();
978                loop {
979                    let line = read_line(stream).await?;
980                    if line.is_empty() {
981                        break;
982                    }
983                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
984                    if parts.len() == 2 {
985                        headers.insert(parts[0].to_string(), parts[1].to_string());
986                    }
987                }
988                Ok(headers)
989            }
990
991            async fn read_body<St: Stream>(
992                stream: &mut St,
993                content_length: usize,
994            ) -> Result<String, Error> {
995                let mut body = vec![0; content_length];
996                stream.recv(&mut body).await?;
997                String::from_utf8(body).map_err(|_| Error::ReadFailed)
998            }
999
1000            // Simulate a client connecting to the server
1001            let client_handle = context
1002                .with_label("client")
1003                .spawn(move |context| async move {
1004                    let (_, mut stream) = loop {
1005                        match context.dial(address).await {
1006                            Ok((sink, stream)) => break (sink, stream),
1007                            Err(e) => {
1008                                // The client may be polled before the server is ready, that's alright!
1009                                error!(err =?e, "failed to connect");
1010                                context.sleep(Duration::from_millis(10)).await;
1011                            }
1012                        }
1013                    };
1014
1015                    // Read and verify the HTTP status line
1016                    let status_line = read_line(&mut stream).await.unwrap();
1017                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1018
1019                    // Read and parse headers
1020                    let headers = read_headers(&mut stream).await.unwrap();
1021                    let content_length = headers
1022                        .get("Content-Length")
1023                        .unwrap()
1024                        .parse::<usize>()
1025                        .unwrap();
1026
1027                    // Read and verify the body
1028                    let body = read_body(&mut stream, content_length).await.unwrap();
1029                    assert!(body.contains("test_counter_total 1"));
1030                });
1031
1032            // Wait for the client task to complete
1033            client_handle.await.unwrap();
1034        });
1035    }
1036
1037    #[test]
1038    fn test_deterministic_future() {
1039        let (runner, _, _) = deterministic::Executor::default();
1040        test_error_future(runner);
1041    }
1042
1043    #[test]
1044    fn test_deterministic_clock_sleep() {
1045        let (executor, context, _) = deterministic::Executor::default();
1046        assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
1047        test_clock_sleep(executor, context);
1048    }
1049
1050    #[test]
1051    fn test_deterministic_clock_sleep_until() {
1052        let (executor, context, _) = deterministic::Executor::default();
1053        test_clock_sleep_until(executor, context);
1054    }
1055
1056    #[test]
1057    fn test_deterministic_root_finishes() {
1058        let (executor, context, _) = deterministic::Executor::default();
1059        test_root_finishes(executor, context);
1060    }
1061
1062    #[test]
1063    fn test_deterministic_spawn_abort() {
1064        let (executor, context, _) = deterministic::Executor::default();
1065        test_spawn_abort(executor, context);
1066    }
1067
1068    #[test]
1069    fn test_deterministic_panic_aborts_root() {
1070        let (runner, _, _) = deterministic::Executor::default();
1071        test_panic_aborts_root(runner);
1072    }
1073
1074    #[test]
1075    #[should_panic(expected = "blah")]
1076    fn test_deterministic_panic_aborts_spawn() {
1077        let (executor, context, _) = deterministic::Executor::default();
1078        test_panic_aborts_spawn(executor, context);
1079    }
1080
1081    #[test]
1082    fn test_deterministic_select() {
1083        let (executor, _, _) = deterministic::Executor::default();
1084        test_select(executor);
1085    }
1086
1087    #[test]
1088    fn test_deterministic_select_loop() {
1089        let (executor, context, _) = deterministic::Executor::default();
1090        test_select_loop(executor, context);
1091    }
1092
1093    #[test]
1094    fn test_deterministic_storage_operations() {
1095        let (executor, context, _) = deterministic::Executor::default();
1096        test_storage_operations(executor, context);
1097    }
1098
1099    #[test]
1100    fn test_deterministic_blob_read_write() {
1101        let (executor, context, _) = deterministic::Executor::default();
1102        test_blob_read_write(executor, context);
1103    }
1104
1105    #[test]
1106    fn test_deterministic_many_partition_read_write() {
1107        let (executor, context, _) = deterministic::Executor::default();
1108        test_many_partition_read_write(executor, context);
1109    }
1110
1111    #[test]
1112    fn test_deterministic_blob_read_past_length() {
1113        let (executor, context, _) = deterministic::Executor::default();
1114        test_blob_read_past_length(executor, context);
1115    }
1116
1117    #[test]
1118    fn test_deterministic_blob_clone_and_concurrent_read() {
1119        // Run test
1120        let (executor, context, _) = deterministic::Executor::default();
1121        test_blob_clone_and_concurrent_read(executor, context.clone());
1122
1123        // Ensure no blobs still open
1124        let buffer = context.encode();
1125        assert!(buffer.contains("open_blobs 0"));
1126    }
1127
1128    #[test]
1129    fn test_deterministic_shutdown() {
1130        let (executor, context, _) = deterministic::Executor::default();
1131        test_shutdown(executor, context);
1132    }
1133
1134    #[test]
1135    fn test_deterministic_spawn_ref() {
1136        let (executor, context, _) = deterministic::Executor::default();
1137        test_spawn_ref(executor, context);
1138    }
1139
1140    #[test]
1141    #[should_panic]
1142    fn test_deterministic_spawn_ref_duplicate() {
1143        let (executor, context, _) = deterministic::Executor::default();
1144        test_spawn_ref_duplicate(executor, context);
1145    }
1146
1147    #[test]
1148    #[should_panic]
1149    fn test_deterministic_spawn_duplicate() {
1150        let (executor, context, _) = deterministic::Executor::default();
1151        test_spawn_duplicate(executor, context);
1152    }
1153
1154    #[test]
1155    fn test_deterministic_spawn_blocking() {
1156        let (executor, context, _) = deterministic::Executor::default();
1157        test_spawn_blocking(executor, context);
1158    }
1159
1160    #[test]
1161    #[should_panic(expected = "blocking task panicked")]
1162    fn test_deterministic_spawn_blocking_panic() {
1163        let (executor, context, _) = deterministic::Executor::default();
1164        executor.start(async move {
1165            let handle = context.spawn_blocking(|| {
1166                panic!("blocking task panicked");
1167            });
1168            handle.await.unwrap();
1169        });
1170    }
1171
1172    #[test]
1173    fn test_deterministic_spawn_blocking_abort() {
1174        let (executor, context, _) = deterministic::Executor::default();
1175        test_spawn_blocking_abort(executor, context);
1176    }
1177
1178    #[test]
1179    fn test_deterministic_metrics() {
1180        let (executor, context, _) = deterministic::Executor::default();
1181        test_metrics(executor, context);
1182    }
1183
1184    #[test]
1185    #[should_panic]
1186    fn test_deterministic_metrics_label() {
1187        let (executor, context, _) = deterministic::Executor::default();
1188        test_metrics_label(executor, context);
1189    }
1190
1191    #[test]
1192    fn test_deterministic_metrics_serve() {
1193        let (executor, context, _) = deterministic::Executor::default();
1194        test_metrics_serve(executor, context);
1195    }
1196
1197    #[test]
1198    fn test_tokio_error_future() {
1199        let (runner, _) = tokio::Executor::default();
1200        test_error_future(runner);
1201    }
1202
1203    #[test]
1204    fn test_tokio_clock_sleep() {
1205        let (executor, context) = tokio::Executor::default();
1206        test_clock_sleep(executor, context);
1207    }
1208
1209    #[test]
1210    fn test_tokio_clock_sleep_until() {
1211        let (executor, context) = tokio::Executor::default();
1212        test_clock_sleep_until(executor, context);
1213    }
1214
1215    #[test]
1216    fn test_tokio_root_finishes() {
1217        let (executor, context) = tokio::Executor::default();
1218        test_root_finishes(executor, context);
1219    }
1220
1221    #[test]
1222    fn test_tokio_spawn_abort() {
1223        let (executor, context) = tokio::Executor::default();
1224        test_spawn_abort(executor, context);
1225    }
1226
1227    #[test]
1228    fn test_tokio_panic_aborts_root() {
1229        let (runner, _) = tokio::Executor::default();
1230        test_panic_aborts_root(runner);
1231    }
1232
1233    #[test]
1234    fn test_tokio_panic_aborts_spawn() {
1235        let (executor, context) = tokio::Executor::default();
1236        test_panic_aborts_spawn(executor, context);
1237    }
1238
1239    #[test]
1240    fn test_tokio_select() {
1241        let (executor, _) = tokio::Executor::default();
1242        test_select(executor);
1243    }
1244
1245    #[test]
1246    fn test_tokio_select_loop() {
1247        let (executor, context) = tokio::Executor::default();
1248        test_select_loop(executor, context);
1249    }
1250
1251    #[test]
1252    fn test_tokio_storage_operations() {
1253        let (executor, context) = tokio::Executor::default();
1254        test_storage_operations(executor, context);
1255    }
1256
1257    #[test]
1258    fn test_tokio_blob_read_write() {
1259        let (executor, context) = tokio::Executor::default();
1260        test_blob_read_write(executor, context);
1261    }
1262
1263    #[test]
1264    fn test_tokio_many_partition_read_write() {
1265        let (executor, context) = tokio::Executor::default();
1266        test_many_partition_read_write(executor, context);
1267    }
1268
1269    #[test]
1270    fn test_tokio_blob_read_past_length() {
1271        let (executor, context) = tokio::Executor::default();
1272        test_blob_read_past_length(executor, context);
1273    }
1274
1275    #[test]
1276    fn test_tokio_blob_clone_and_concurrent_read() {
1277        // Run test
1278        let (executor, context) = tokio::Executor::default();
1279        test_blob_clone_and_concurrent_read(executor, context.clone());
1280
1281        // Ensure no blobs still open
1282        let buffer = context.encode();
1283        assert!(buffer.contains("open_blobs 0"));
1284    }
1285
1286    #[test]
1287    fn test_tokio_shutdown() {
1288        let (executor, context) = tokio::Executor::default();
1289        test_shutdown(executor, context);
1290    }
1291
1292    #[test]
1293    fn test_tokio_spawn_ref() {
1294        let (executor, context) = tokio::Executor::default();
1295        test_spawn_ref(executor, context);
1296    }
1297
1298    #[test]
1299    #[should_panic]
1300    fn test_tokio_spawn_ref_duplicate() {
1301        let (executor, context) = tokio::Executor::default();
1302        test_spawn_ref_duplicate(executor, context);
1303    }
1304
1305    #[test]
1306    #[should_panic]
1307    fn test_tokio_spawn_duplicate() {
1308        let (executor, context) = tokio::Executor::default();
1309        test_spawn_duplicate(executor, context);
1310    }
1311
1312    #[test]
1313    fn test_tokio_spawn_blocking() {
1314        let (executor, context) = tokio::Executor::default();
1315        test_spawn_blocking(executor, context);
1316    }
1317
1318    #[test]
1319    fn test_tokio_spawn_blocking_panic() {
1320        let (executor, context) = tokio::Executor::default();
1321        executor.start(async move {
1322            let handle = context.spawn_blocking(|| {
1323                panic!("blocking task panicked");
1324            });
1325            let result = handle.await;
1326            assert_eq!(result, Err(Error::Exited));
1327        });
1328    }
1329
1330    #[test]
1331    fn test_tokio_spawn_blocking_abort() {
1332        let (executor, context) = tokio::Executor::default();
1333        test_spawn_blocking_abort(executor, context);
1334    }
1335
1336    #[test]
1337    fn test_tokio_metrics() {
1338        let (executor, context) = tokio::Executor::default();
1339        test_metrics(executor, context);
1340    }
1341
1342    #[test]
1343    #[should_panic]
1344    fn test_tokio_metrics_label() {
1345        let (executor, context) = tokio::Executor::default();
1346        test_metrics_label(executor, context);
1347    }
1348
1349    #[test]
1350    fn test_tokio_metrics_serve() {
1351        let (executor, context) = tokio::Executor::default();
1352        test_metrics_serve(executor, context);
1353    }
1354}