commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::io::Error as IoError;
23use std::{
24    future::Future,
25    net::SocketAddr,
26    time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36    if #[cfg(not(target_arch = "wasm32"))] {
37        pub mod tokio;
38        pub mod benchmarks;
39    }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49/// Prefix for runtime metrics.
50const METRICS_PREFIX: &str = "runtime";
51
52/// Errors that can occur when interacting with the runtime.
53#[derive(Error, Debug)]
54pub enum Error {
55    #[error("exited")]
56    Exited,
57    #[error("closed")]
58    Closed,
59    #[error("timeout")]
60    Timeout,
61    #[error("bind failed")]
62    BindFailed,
63    #[error("connection failed")]
64    ConnectionFailed,
65    #[error("write failed")]
66    WriteFailed,
67    #[error("read failed")]
68    ReadFailed,
69    #[error("send failed")]
70    SendFailed,
71    #[error("recv failed")]
72    RecvFailed,
73    #[error("partition creation failed: {0}")]
74    PartitionCreationFailed(String),
75    #[error("partition missing: {0}")]
76    PartitionMissing(String),
77    #[error("partition corrupt: {0}")]
78    PartitionCorrupt(String),
79    #[error("blob open failed: {0}/{1} error: {2}")]
80    BlobOpenFailed(String, String, IoError),
81    #[error("blob missing: {0}/{1}")]
82    BlobMissing(String, String),
83    #[error("blob truncate failed: {0}/{1} error: {2}")]
84    BlobTruncateFailed(String, String, IoError),
85    #[error("blob sync failed: {0}/{1} error: {2}")]
86    BlobSyncFailed(String, String, IoError),
87    #[error("blob close failed: {0}/{1} error: {2}")]
88    BlobCloseFailed(String, String, IoError),
89    #[error("blob insufficient length")]
90    BlobInsufficientLength,
91    #[error("offset overflow")]
92    OffsetOverflow,
93}
94
95/// Interface that any task scheduler must implement to start
96/// running tasks.
97pub trait Runner {
98    /// Context defines the environment available to tasks.
99    /// Example of possible services provided by the context include:
100    /// - [Clock] for time-based operations
101    /// - [Network] for network operations
102    /// - [Storage] for storage operations
103    type Context;
104
105    /// Start running a root task.
106    fn start<F, Fut>(self, f: F) -> Fut::Output
107    where
108        F: FnOnce(Self::Context) -> Fut,
109        Fut: Future;
110}
111
112/// Interface that any task scheduler must implement to spawn tasks.
113pub trait Spawner: Clone + Send + Sync + 'static {
114    /// Enqueue a task to be executed.
115    ///
116    /// Unlike a future, a spawned task will start executing immediately (even if the caller
117    /// does not await the handle).
118    ///
119    /// Spawned tasks consume the context used to create them. This ensures that context cannot
120    /// be shared between tasks and that a task's context always comes from somewhere.
121    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122    where
123        F: FnOnce(Self) -> Fut + Send + 'static,
124        Fut: Future<Output = T> + Send + 'static,
125        T: Send + 'static;
126
127    /// Enqueue a task to be executed (without consuming the context).
128    ///
129    /// The semantics are the same as [Spawner::spawn].
130    ///
131    /// # Warning
132    ///
133    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
134    /// to prevent accidental misuse.
135    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136    where
137        F: Future<Output = T> + Send + 'static,
138        T: Send + 'static;
139
140    /// Enqueue a blocking task to be executed.
141    ///
142    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
143    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
144    /// thread (tasks that are expected to run indefinitely).
145    ///
146    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
147    /// result.
148    ///
149    /// # Motivation
150    ///
151    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
152    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
153    /// of said runtimes.
154    ///
155    /// # Warning
156    ///
157    /// Blocking tasks cannot be aborted.
158    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159    where
160        F: FnOnce(Self) -> T + Send + 'static,
161        T: Send + 'static;
162
163    /// Enqueue a blocking task to be executed (without consuming the context).
164    ///
165    /// The semantics are the same as [Spawner::spawn_blocking].
166    ///
167    /// # Warning
168    ///
169    /// If this function is used to spawn multiple tasks from the same context,
170    /// the runtime will panic to prevent accidental misuse.
171    fn spawn_blocking_ref<F, T>(
172        &mut self,
173        dedicated: bool,
174    ) -> impl FnOnce(F) -> Handle<T> + 'static
175    where
176        F: FnOnce() -> T + Send + 'static,
177        T: Send + 'static;
178
179    /// Signals the runtime to stop execution and that all outstanding tasks
180    /// should perform any required cleanup and exit. This method is idempotent and
181    /// can be called multiple times.
182    ///
183    /// This method does not actually kill any tasks but rather signals to them, using
184    /// the `Signal` returned by `stopped`, that they should exit.
185    fn stop(&self, value: i32);
186
187    /// Returns an instance of a `Signal` that resolves when `stop` is called by
188    /// any task.
189    ///
190    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
191    fn stopped(&self) -> Signal;
192}
193
194/// Interface to register and encode metrics.
195pub trait Metrics: Clone + Send + Sync + 'static {
196    /// Get the current label of the context.
197    fn label(&self) -> String;
198
199    /// Create a new instance of `Metrics` with the given label appended to the end
200    /// of the current `Metrics` label.
201    ///
202    /// This is commonly used to create a nested context for `register`.
203    ///
204    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
205    /// label (reserved for metrics for the runtime).
206    fn with_label(&self, label: &str) -> Self;
207
208    /// Prefix the given label with the current context's label.
209    ///
210    /// Unlike `with_label`, this method does not create a new context.
211    fn scoped_label(&self, label: &str) -> String {
212        let label = if self.label().is_empty() {
213            label.to_string()
214        } else {
215            format!("{}_{}", self.label(), label)
216        };
217        assert!(
218            !label.starts_with(METRICS_PREFIX),
219            "using runtime label is not allowed"
220        );
221        label
222    }
223
224    /// Register a metric with the runtime.
225    ///
226    /// Any registered metric will include (as a prefix) the label of the current context.
227    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
228
229    /// Encode all metrics into a buffer.
230    fn encode(&self) -> String;
231}
232
233/// Interface that any task scheduler must implement to provide
234/// time-based operations.
235///
236/// It is necessary to mock time to provide deterministic execution
237/// of arbitrary tasks.
238pub trait Clock: Clone + Send + Sync + 'static {
239    /// Returns the current time.
240    fn current(&self) -> SystemTime;
241
242    /// Sleep for the given duration.
243    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
244
245    /// Sleep until the given deadline.
246    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
247}
248
249/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
250pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
251
252/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
253pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
254
255/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
256pub type ListenerOf<N> = <N as crate::Network>::Listener;
257
258/// Interface that any runtime must implement to create
259/// network connections.
260pub trait Network: Clone + Send + Sync + 'static {
261    /// The type of [Listener] that's returned when binding to a socket.
262    /// Accepting a connection returns a [Sink] and [Stream] which are defined
263    /// by the [Listener] and used to send and receive data over the connection.
264    type Listener: Listener;
265
266    /// Bind to the given socket address.
267    fn bind(
268        &self,
269        socket: SocketAddr,
270    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
271
272    /// Dial the given socket address.
273    fn dial(
274        &self,
275        socket: SocketAddr,
276    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
277}
278
279/// Interface that any runtime must implement to handle
280/// incoming network connections.
281pub trait Listener: Sync + Send + 'static {
282    /// The type of [Sink] that's returned when accepting a connection.
283    /// This is used to send data to the remote connection.
284    type Sink: Sink;
285    /// The type of [Stream] that's returned when accepting a connection.
286    /// This is used to receive data from the remote connection.
287    type Stream: Stream;
288
289    /// Accept an incoming connection.
290    fn accept(
291        &mut self,
292    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
293
294    /// Returns the local address of the listener.
295    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
296}
297
298/// Interface that any runtime must implement to send
299/// messages over a network connection.
300pub trait Sink: Sync + Send + 'static {
301    /// Send a message to the sink.
302    fn send(
303        &mut self,
304        msg: impl Into<StableBuf> + Send,
305    ) -> impl Future<Output = Result<(), Error>> + Send;
306}
307
308/// Interface that any runtime must implement to receive
309/// messages over a network connection.
310pub trait Stream: Sync + Send + 'static {
311    /// Receive a message from the stream, storing it in the given buffer.
312    /// Reads exactly the number of bytes that fit in the buffer.
313    fn recv(
314        &mut self,
315        buf: impl Into<StableBuf> + Send,
316    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
317}
318
319/// Interface to interact with storage.
320///
321///
322/// To support storage implementations that enable concurrent reads and
323/// writes, blobs are responsible for maintaining synchronization.
324///
325/// Storage can be backed by a local filesystem, cloud storage, etc.
326pub trait Storage: Clone + Send + Sync + 'static {
327    /// The readable/writeable storage buffer that can be opened by this Storage.
328    type Blob: Blob;
329
330    /// Open an existing blob in a given partition or create a new one, returning
331    /// the blob and its length.
332    ///
333    /// Multiple instances of the same blob can be opened concurrently, however,
334    /// writing to the same blob concurrently may lead to undefined behavior.
335    fn open(
336        &self,
337        partition: &str,
338        name: &[u8],
339    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
340
341    /// Remove a blob from a given partition.
342    ///
343    /// If no `name` is provided, the entire partition is removed.
344    fn remove(
345        &self,
346        partition: &str,
347        name: Option<&[u8]>,
348    ) -> impl Future<Output = Result<(), Error>> + Send;
349
350    /// Return all blobs in a given partition.
351    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
352}
353
354/// Interface to read and write to a blob.
355///
356/// To support blob implementations that enable concurrent reads and
357/// writes, blobs are responsible for maintaining synchronization.
358///
359/// Cloning a blob is similar to wrapping a single file descriptor in
360/// a lock whereas opening a new blob (of the same name) is similar to
361/// opening a new file descriptor. If multiple blobs are opened with the same
362/// name, they are not expected to coordinate access to underlying storage
363/// and writing to both is undefined behavior.
364#[allow(clippy::len_without_is_empty)]
365pub trait Blob: Clone + Send + Sync + 'static {
366    /// Read from the blob at the given offset.
367    ///
368    /// `read_at` does not return the number of bytes read because it
369    /// only returns once the entire buffer has been filled.
370    fn read_at(
371        &self,
372        buf: impl Into<StableBuf> + Send,
373        offset: u64,
374    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
375
376    /// Write `buf` to the blob at the given offset.
377    fn write_at(
378        &self,
379        buf: impl Into<StableBuf> + Send,
380        offset: u64,
381    ) -> impl Future<Output = Result<(), Error>> + Send;
382
383    /// Truncate the blob to the given length.
384    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
385
386    /// Ensure all pending data is durably persisted.
387    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
388
389    /// Close the blob.
390    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396    use bytes::Bytes;
397    use commonware_macros::select;
398    use futures::channel::oneshot;
399    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
400    use prometheus_client::metrics::counter::Counter;
401    use std::collections::HashMap;
402    use std::panic::{catch_unwind, AssertUnwindSafe};
403    use std::str::FromStr;
404    use std::sync::Mutex;
405    use tracing::{error, Level};
406    use utils::reschedule;
407
408    fn test_error_future<R: Runner>(runner: R) {
409        async fn error_future() -> Result<&'static str, &'static str> {
410            Err("An error occurred")
411        }
412        let result = runner.start(|_| error_future());
413        assert_eq!(result, Err("An error occurred"));
414    }
415
416    fn test_clock_sleep<R: Runner>(runner: R)
417    where
418        R::Context: Spawner + Clock,
419    {
420        runner.start(|context| async move {
421            // Capture initial time
422            let start = context.current();
423            let sleep_duration = Duration::from_millis(10);
424            context.sleep(sleep_duration).await;
425
426            // After run, time should have advanced
427            let end = context.current();
428            assert!(end.duration_since(start).unwrap() >= sleep_duration);
429        });
430    }
431
432    fn test_clock_sleep_until<R: Runner>(runner: R)
433    where
434        R::Context: Spawner + Clock,
435    {
436        runner.start(|context| async move {
437            // Trigger sleep
438            let now = context.current();
439            context.sleep_until(now + Duration::from_millis(100)).await;
440
441            // Ensure slept duration has elapsed
442            let elapsed = now.elapsed().unwrap();
443            assert!(elapsed >= Duration::from_millis(100));
444        });
445    }
446
447    fn test_root_finishes<R: Runner>(runner: R)
448    where
449        R::Context: Spawner,
450    {
451        runner.start(|context| async move {
452            context.spawn(|_| async move {
453                loop {
454                    reschedule().await;
455                }
456            });
457        });
458    }
459
460    fn test_spawn_abort<R: Runner>(runner: R)
461    where
462        R::Context: Spawner,
463    {
464        runner.start(|context| async move {
465            let handle = context.spawn(|_| async move {
466                loop {
467                    reschedule().await;
468                }
469            });
470            handle.abort();
471            assert!(matches!(handle.await, Err(Error::Closed)));
472        });
473    }
474
475    fn test_panic_aborts_root<R: Runner>(runner: R) {
476        let result = catch_unwind(AssertUnwindSafe(|| {
477            runner.start(|_| async move {
478                panic!("blah");
479            });
480        }));
481        result.unwrap_err();
482    }
483
484    fn test_panic_aborts_spawn<R: Runner>(runner: R)
485    where
486        R::Context: Spawner,
487    {
488        let result = runner.start(|context| async move {
489            let result = context.spawn(|_| async move {
490                panic!("blah");
491            });
492            assert!(matches!(result.await, Err(Error::Exited)));
493            Result::<(), Error>::Ok(())
494        });
495
496        // Ensure panic was caught
497        result.unwrap();
498    }
499
500    fn test_select<R: Runner>(runner: R) {
501        runner.start(|_| async move {
502            // Test first branch
503            let output = Mutex::new(0);
504            select! {
505                v1 = ready(1) => {
506                    *output.lock().unwrap() = v1;
507                },
508                v2 = ready(2) => {
509                    *output.lock().unwrap() = v2;
510                },
511            };
512            assert_eq!(*output.lock().unwrap(), 1);
513
514            // Test second branch
515            select! {
516                v1 = std::future::pending::<i32>() => {
517                    *output.lock().unwrap() = v1;
518                },
519                v2 = ready(2) => {
520                    *output.lock().unwrap() = v2;
521                },
522            };
523            assert_eq!(*output.lock().unwrap(), 2);
524        });
525    }
526
527    /// Ensure future fusing works as expected.
528    fn test_select_loop<R: Runner>(runner: R)
529    where
530        R::Context: Clock,
531    {
532        runner.start(|context| async move {
533            // Should hit timeout
534            let (mut sender, mut receiver) = mpsc::unbounded();
535            for _ in 0..2 {
536                select! {
537                    v = receiver.next() => {
538                        panic!("unexpected value: {:?}", v);
539                    },
540                    _ = context.sleep(Duration::from_millis(100)) => {
541                        continue;
542                    },
543                };
544            }
545
546            // Populate channel
547            sender.send(0).await.unwrap();
548            sender.send(1).await.unwrap();
549
550            // Prefer not reading channel without losing messages
551            select! {
552                _ = async {} => {
553                    // Skip reading from channel even though populated
554                },
555                v = receiver.next() => {
556                    panic!("unexpected value: {:?}", v);
557                },
558            };
559
560            // Process messages
561            for i in 0..2 {
562                select! {
563                    _ = context.sleep(Duration::from_millis(100)) => {
564                        panic!("timeout");
565                    },
566                    v = receiver.next() => {
567                        assert_eq!(v.unwrap(), i);
568                    },
569                };
570            }
571        });
572    }
573
574    fn test_storage_operations<R: Runner>(runner: R)
575    where
576        R::Context: Storage,
577    {
578        runner.start(|context| async move {
579            let partition = "test_partition";
580            let name = b"test_blob";
581
582            // Open a new blob
583            let (blob, _) = context
584                .open(partition, name)
585                .await
586                .expect("Failed to open blob");
587
588            // Write data to the blob
589            let data = b"Hello, Storage!";
590            blob.write_at(Vec::from(data), 0)
591                .await
592                .expect("Failed to write to blob");
593
594            // Sync the blob
595            blob.sync().await.expect("Failed to sync blob");
596
597            // Read data from the blob
598            let read = blob
599                .read_at(vec![0; data.len()], 0)
600                .await
601                .expect("Failed to read from blob");
602            assert_eq!(read.as_ref(), data);
603
604            // Close the blob
605            blob.close().await.expect("Failed to close blob");
606
607            // Scan blobs in the partition
608            let blobs = context
609                .scan(partition)
610                .await
611                .expect("Failed to scan partition");
612            assert!(blobs.contains(&name.to_vec()));
613
614            // Reopen the blob
615            let (blob, len) = context
616                .open(partition, name)
617                .await
618                .expect("Failed to reopen blob");
619            assert_eq!(len, data.len() as u64);
620
621            // Read data part of message back
622            let read = blob
623                .read_at(vec![0u8; 7], 7)
624                .await
625                .expect("Failed to read data");
626            assert_eq!(read.as_ref(), b"Storage");
627
628            // Close the blob
629            blob.close().await.expect("Failed to close blob");
630
631            // Remove the blob
632            context
633                .remove(partition, Some(name))
634                .await
635                .expect("Failed to remove blob");
636
637            // Ensure the blob is removed
638            let blobs = context
639                .scan(partition)
640                .await
641                .expect("Failed to scan partition");
642            assert!(!blobs.contains(&name.to_vec()));
643
644            // Remove the partition
645            context
646                .remove(partition, None)
647                .await
648                .expect("Failed to remove partition");
649
650            // Scan the partition
651            let result = context.scan(partition).await;
652            assert!(matches!(result, Err(Error::PartitionMissing(_))));
653        });
654    }
655
656    fn test_blob_read_write<R: Runner>(runner: R)
657    where
658        R::Context: Storage,
659    {
660        runner.start(|context| async move {
661            let partition = "test_partition";
662            let name = b"test_blob_rw";
663
664            // Open a new blob
665            let (blob, _) = context
666                .open(partition, name)
667                .await
668                .expect("Failed to open blob");
669
670            // Write data at different offsets
671            let data1 = b"Hello";
672            let data2 = b"World";
673            blob.write_at(Vec::from(data1), 0)
674                .await
675                .expect("Failed to write data1");
676            blob.write_at(Vec::from(data2), 5)
677                .await
678                .expect("Failed to write data2");
679
680            // Read data back
681            let read = blob
682                .read_at(vec![0u8; 10], 0)
683                .await
684                .expect("Failed to read data");
685            assert_eq!(&read.as_ref()[..5], data1);
686            assert_eq!(&read.as_ref()[5..], data2);
687
688            // Rewrite data without affecting length
689            let data3 = b"Store";
690            blob.write_at(Vec::from(data3), 5)
691                .await
692                .expect("Failed to write data3");
693
694            // Truncate the blob
695            blob.truncate(5).await.expect("Failed to truncate blob");
696            let read = blob
697                .read_at(vec![0; 5], 0)
698                .await
699                .expect("Failed to read data");
700            assert_eq!(&read.as_ref()[..5], data1);
701
702            // Full read after truncation
703            let result = blob.read_at(vec![0u8; 10], 0).await;
704            assert!(result.is_err());
705
706            // Close the blob
707            blob.close().await.expect("Failed to close blob");
708        });
709    }
710
711    fn test_many_partition_read_write<R: Runner>(runner: R)
712    where
713        R::Context: Storage,
714    {
715        runner.start(|context| async move {
716            let partitions = ["partition1", "partition2", "partition3"];
717            let name = b"test_blob_rw";
718            let data1 = b"Hello";
719            let data2 = b"World";
720
721            for (additional, partition) in partitions.iter().enumerate() {
722                // Open a new blob
723                let (blob, _) = context
724                    .open(partition, name)
725                    .await
726                    .expect("Failed to open blob");
727
728                // Write data at different offsets
729                blob.write_at(Vec::from(data1), 0)
730                    .await
731                    .expect("Failed to write data1");
732                blob.write_at(Vec::from(data2), 5 + additional as u64)
733                    .await
734                    .expect("Failed to write data2");
735
736                // Close the blob
737                blob.close().await.expect("Failed to close blob");
738            }
739
740            for (additional, partition) in partitions.iter().enumerate() {
741                // Open a new blob
742                let (blob, len) = context
743                    .open(partition, name)
744                    .await
745                    .expect("Failed to open blob");
746                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
747
748                // Read data back
749                let read = blob
750                    .read_at(vec![0u8; 10 + additional], 0)
751                    .await
752                    .expect("Failed to read data");
753                assert_eq!(&read.as_ref()[..5], b"Hello");
754                assert_eq!(&read.as_ref()[5 + additional..], b"World");
755
756                // Close the blob
757                blob.close().await.expect("Failed to close blob");
758            }
759        });
760    }
761
762    fn test_blob_read_past_length<R: Runner>(runner: R)
763    where
764        R::Context: Storage,
765    {
766        runner.start(|context| async move {
767            let partition = "test_partition";
768            let name = b"test_blob_rw";
769
770            // Open a new blob
771            let (blob, _) = context
772                .open(partition, name)
773                .await
774                .expect("Failed to open blob");
775
776            // Read data past file length (empty file)
777            let result = blob.read_at(vec![0u8; 10], 0).await;
778            assert!(result.is_err());
779
780            // Write data to the blob
781            let data = b"Hello, Storage!".to_vec();
782            blob.write_at(data, 0)
783                .await
784                .expect("Failed to write to blob");
785
786            // Read data past file length (non-empty file)
787            let result = blob.read_at(vec![0u8; 20], 0).await;
788            assert!(result.is_err());
789        })
790    }
791
792    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
793    where
794        R::Context: Spawner + Storage + Metrics,
795    {
796        runner.start(|context| async move {
797            let partition = "test_partition";
798            let name = b"test_blob_rw";
799
800            // Open a new blob
801            let (blob, _) = context
802                .open(partition, name)
803                .await
804                .expect("Failed to open blob");
805
806            // Write data to the blob
807            let data = b"Hello, Storage!";
808            blob.write_at(Vec::from(data), 0)
809                .await
810                .expect("Failed to write to blob");
811
812            // Sync the blob
813            blob.sync().await.expect("Failed to sync blob");
814
815            // Read data from the blob in clone
816            let check1 = context.with_label("check1").spawn({
817                let blob = blob.clone();
818                move |_| async move {
819                    let read = blob
820                        .read_at(vec![0u8; data.len()], 0)
821                        .await
822                        .expect("Failed to read from blob");
823                    assert_eq!(read.as_ref(), data);
824                }
825            });
826            let check2 = context.with_label("check2").spawn({
827                let blob = blob.clone();
828                move |_| async move {
829                    let read = blob
830                        .read_at(vec![0; data.len()], 0)
831                        .await
832                        .expect("Failed to read from blob");
833                    assert_eq!(read.as_ref(), data);
834                }
835            });
836
837            // Wait for both reads to complete
838            let result = join!(check1, check2);
839            assert!(result.0.is_ok());
840            assert!(result.1.is_ok());
841
842            // Read data from the blob
843            let read = blob
844                .read_at(vec![0; data.len()], 0)
845                .await
846                .expect("Failed to read from blob");
847            assert_eq!(read.as_ref(), data);
848
849            // Close the blob
850            blob.close().await.expect("Failed to close blob");
851
852            // Ensure no blobs still open
853            let buffer = context.encode();
854            assert!(buffer.contains("open_blobs 0"));
855        });
856    }
857
858    fn test_shutdown<R: Runner>(runner: R)
859    where
860        R::Context: Spawner + Metrics + Clock,
861    {
862        let kill = 9;
863        runner.start(|context| async move {
864            // Spawn a task that waits for signal
865            let before = context
866                .with_label("before")
867                .spawn(move |context| async move {
868                    let sig = context.stopped().await;
869                    assert_eq!(sig.unwrap(), kill);
870                });
871
872            // Spawn a task after stop is called
873            let after = context
874                .with_label("after")
875                .spawn(move |context| async move {
876                    // Wait for stop signal
877                    let mut signal = context.stopped();
878                    loop {
879                        select! {
880                            sig = &mut signal => {
881                                // Stopper resolved
882                                assert_eq!(sig.unwrap(), kill);
883                                break;
884                            },
885                            _ = context.sleep(Duration::from_millis(10)) => {
886                                // Continue waiting
887                            },
888                        }
889                    }
890                });
891
892            // Sleep for a bit before stopping
893            context.sleep(Duration::from_millis(50)).await;
894
895            // Signal the task
896            context.stop(kill);
897
898            // Ensure both tasks complete
899            let result = join!(before, after);
900            assert!(result.0.is_ok());
901            assert!(result.1.is_ok());
902        });
903    }
904
905    fn test_spawn_ref<R: Runner>(runner: R)
906    where
907        R::Context: Spawner,
908    {
909        runner.start(|mut context| async move {
910            let handle = context.spawn_ref();
911            let result = handle(async move { 42 }).await;
912            assert!(matches!(result, Ok(42)));
913        });
914    }
915
916    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
917    where
918        R::Context: Spawner,
919    {
920        runner.start(|mut context| async move {
921            let handle = context.spawn_ref();
922            let result = handle(async move { 42 }).await;
923            assert!(matches!(result, Ok(42)));
924
925            // Ensure context is consumed
926            let handle = context.spawn_ref();
927            let result = handle(async move { 42 }).await;
928            assert!(matches!(result, Ok(42)));
929        });
930    }
931
932    fn test_spawn_duplicate<R: Runner>(runner: R)
933    where
934        R::Context: Spawner,
935    {
936        runner.start(|mut context| async move {
937            let handle = context.spawn_ref();
938            let result = handle(async move { 42 }).await;
939            assert!(matches!(result, Ok(42)));
940
941            // Ensure context is consumed
942            context.spawn(|_| async move { 42 });
943        });
944    }
945
946    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
947    where
948        R::Context: Spawner,
949    {
950        runner.start(|context| async move {
951            let handle = context.spawn_blocking(dedicated, |_| 42);
952            let result = handle.await;
953            assert!(matches!(result, Ok(42)));
954        });
955    }
956
957    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
958    where
959        R::Context: Spawner,
960    {
961        runner.start(|mut context| async move {
962            let spawn = context.spawn_blocking_ref(dedicated);
963            let handle = spawn(|| 42);
964            let result = handle.await;
965            assert!(matches!(result, Ok(42)));
966        });
967    }
968
969    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
970    where
971        R::Context: Spawner,
972    {
973        runner.start(|mut context| async move {
974            let spawn = context.spawn_blocking_ref(dedicated);
975            let result = spawn(|| 42).await;
976            assert!(matches!(result, Ok(42)));
977
978            // Ensure context is consumed
979            context.spawn_blocking(dedicated, |_| 42);
980        });
981    }
982
983    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
984    where
985        R::Context: Spawner,
986    {
987        runner.start(|context| async move {
988            // Create task
989            let (sender, mut receiver) = oneshot::channel();
990            let handle = context.spawn_blocking(dedicated, move |_| {
991                // Wait for abort to be called
992                loop {
993                    if receiver.try_recv().is_ok() {
994                        break;
995                    }
996                }
997
998                // Perform a long-running operation
999                let mut count = 0;
1000                loop {
1001                    count += 1;
1002                    if count >= 100_000_000 {
1003                        break;
1004                    }
1005                }
1006                count
1007            });
1008
1009            // Abort the task
1010            //
1011            // If there was an `.await` prior to sending a message over the oneshot, this test
1012            // could deadlock (depending on the runtime implementation) because the blocking task
1013            // would never yield (preventing send from being called).
1014            handle.abort();
1015            sender.send(()).unwrap();
1016
1017            // Wait for the task to complete
1018            assert!(matches!(handle.await, Ok(100_000_000)));
1019        });
1020    }
1021
1022    fn test_metrics<R: Runner>(runner: R)
1023    where
1024        R::Context: Metrics,
1025    {
1026        runner.start(|context| async move {
1027            // Assert label
1028            assert_eq!(context.label(), "");
1029
1030            // Register a metric
1031            let counter = Counter::<u64>::default();
1032            context.register("test", "test", counter.clone());
1033
1034            // Increment the counter
1035            counter.inc();
1036
1037            // Encode metrics
1038            let buffer = context.encode();
1039            assert!(buffer.contains("test_total 1"));
1040
1041            // Nested context
1042            let context = context.with_label("nested");
1043            let nested_counter = Counter::<u64>::default();
1044            context.register("test", "test", nested_counter.clone());
1045
1046            // Increment the counter
1047            nested_counter.inc();
1048
1049            // Encode metrics
1050            let buffer = context.encode();
1051            assert!(buffer.contains("nested_test_total 1"));
1052            assert!(buffer.contains("test_total 1"));
1053        });
1054    }
1055
1056    fn test_metrics_label<R: Runner>(runner: R)
1057    where
1058        R::Context: Metrics,
1059    {
1060        runner.start(|context| async move {
1061            context.with_label(METRICS_PREFIX);
1062        })
1063    }
1064
1065    #[test]
1066    fn test_deterministic_future() {
1067        let runner = deterministic::Runner::default();
1068        test_error_future(runner);
1069    }
1070
1071    #[test]
1072    fn test_deterministic_clock_sleep() {
1073        let executor = deterministic::Runner::default();
1074        test_clock_sleep(executor);
1075    }
1076
1077    #[test]
1078    fn test_deterministic_clock_sleep_until() {
1079        let executor = deterministic::Runner::default();
1080        test_clock_sleep_until(executor);
1081    }
1082
1083    #[test]
1084    fn test_deterministic_root_finishes() {
1085        let executor = deterministic::Runner::default();
1086        test_root_finishes(executor);
1087    }
1088
1089    #[test]
1090    fn test_deterministic_spawn_abort() {
1091        let executor = deterministic::Runner::default();
1092        test_spawn_abort(executor);
1093    }
1094
1095    #[test]
1096    fn test_deterministic_panic_aborts_root() {
1097        let runner = deterministic::Runner::default();
1098        test_panic_aborts_root(runner);
1099    }
1100
1101    #[test]
1102    #[should_panic(expected = "blah")]
1103    fn test_deterministic_panic_aborts_spawn() {
1104        let executor = deterministic::Runner::default();
1105        test_panic_aborts_spawn(executor);
1106    }
1107
1108    #[test]
1109    fn test_deterministic_select() {
1110        let executor = deterministic::Runner::default();
1111        test_select(executor);
1112    }
1113
1114    #[test]
1115    fn test_deterministic_select_loop() {
1116        let executor = deterministic::Runner::default();
1117        test_select_loop(executor);
1118    }
1119
1120    #[test]
1121    fn test_deterministic_storage_operations() {
1122        let executor = deterministic::Runner::default();
1123        test_storage_operations(executor);
1124    }
1125
1126    #[test]
1127    fn test_deterministic_blob_read_write() {
1128        let executor = deterministic::Runner::default();
1129        test_blob_read_write(executor);
1130    }
1131
1132    #[test]
1133    fn test_deterministic_many_partition_read_write() {
1134        let executor = deterministic::Runner::default();
1135        test_many_partition_read_write(executor);
1136    }
1137
1138    #[test]
1139    fn test_deterministic_blob_read_past_length() {
1140        let executor = deterministic::Runner::default();
1141        test_blob_read_past_length(executor);
1142    }
1143
1144    #[test]
1145    fn test_deterministic_blob_clone_and_concurrent_read() {
1146        // Run test
1147        let executor = deterministic::Runner::default();
1148        test_blob_clone_and_concurrent_read(executor);
1149    }
1150
1151    #[test]
1152    fn test_deterministic_shutdown() {
1153        let executor = deterministic::Runner::default();
1154        test_shutdown(executor);
1155    }
1156
1157    #[test]
1158    fn test_deterministic_spawn_ref() {
1159        let executor = deterministic::Runner::default();
1160        test_spawn_ref(executor);
1161    }
1162
1163    #[test]
1164    #[should_panic]
1165    fn test_deterministic_spawn_ref_duplicate() {
1166        let executor = deterministic::Runner::default();
1167        test_spawn_ref_duplicate(executor);
1168    }
1169
1170    #[test]
1171    #[should_panic]
1172    fn test_deterministic_spawn_duplicate() {
1173        let executor = deterministic::Runner::default();
1174        test_spawn_duplicate(executor);
1175    }
1176
1177    #[test]
1178    fn test_deterministic_spawn_blocking() {
1179        for dedicated in [false, true] {
1180            let executor = deterministic::Runner::default();
1181            test_spawn_blocking(executor, dedicated);
1182        }
1183    }
1184
1185    #[test]
1186    #[should_panic(expected = "blocking task panicked")]
1187    fn test_deterministic_spawn_blocking_panic() {
1188        for dedicated in [false, true] {
1189            let executor = deterministic::Runner::default();
1190            executor.start(|context| async move {
1191                let handle = context.spawn_blocking(dedicated, |_| {
1192                    panic!("blocking task panicked");
1193                });
1194                handle.await.unwrap();
1195            });
1196        }
1197    }
1198
1199    #[test]
1200    fn test_deterministic_spawn_blocking_abort() {
1201        for dedicated in [false, true] {
1202            let executor = deterministic::Runner::default();
1203            test_spawn_blocking_abort(executor, dedicated);
1204        }
1205    }
1206
1207    #[test]
1208    fn test_deterministic_spawn_blocking_ref() {
1209        for dedicated in [false, true] {
1210            let executor = deterministic::Runner::default();
1211            test_spawn_blocking_ref(executor, dedicated);
1212        }
1213    }
1214
1215    #[test]
1216    #[should_panic]
1217    fn test_deterministic_spawn_blocking_ref_duplicate() {
1218        for dedicated in [false, true] {
1219            let executor = deterministic::Runner::default();
1220            test_spawn_blocking_ref_duplicate(executor, dedicated);
1221        }
1222    }
1223
1224    #[test]
1225    fn test_deterministic_metrics() {
1226        let executor = deterministic::Runner::default();
1227        test_metrics(executor);
1228    }
1229
1230    #[test]
1231    #[should_panic]
1232    fn test_deterministic_metrics_label() {
1233        let executor = deterministic::Runner::default();
1234        test_metrics_label(executor);
1235    }
1236
1237    #[test]
1238    fn test_tokio_error_future() {
1239        let runner = tokio::Runner::default();
1240        test_error_future(runner);
1241    }
1242
1243    #[test]
1244    fn test_tokio_clock_sleep() {
1245        let executor = tokio::Runner::default();
1246        test_clock_sleep(executor);
1247    }
1248
1249    #[test]
1250    fn test_tokio_clock_sleep_until() {
1251        let executor = tokio::Runner::default();
1252        test_clock_sleep_until(executor);
1253    }
1254
1255    #[test]
1256    fn test_tokio_root_finishes() {
1257        let executor = tokio::Runner::default();
1258        test_root_finishes(executor);
1259    }
1260
1261    #[test]
1262    fn test_tokio_spawn_abort() {
1263        let executor = tokio::Runner::default();
1264        test_spawn_abort(executor);
1265    }
1266
1267    #[test]
1268    fn test_tokio_panic_aborts_root() {
1269        let executor = tokio::Runner::default();
1270        test_panic_aborts_root(executor);
1271    }
1272
1273    #[test]
1274    fn test_tokio_panic_aborts_spawn() {
1275        let executor = tokio::Runner::default();
1276        test_panic_aborts_spawn(executor);
1277    }
1278
1279    #[test]
1280    fn test_tokio_select() {
1281        let executor = tokio::Runner::default();
1282        test_select(executor);
1283    }
1284
1285    #[test]
1286    fn test_tokio_select_loop() {
1287        let executor = tokio::Runner::default();
1288        test_select_loop(executor);
1289    }
1290
1291    #[test]
1292    fn test_tokio_storage_operations() {
1293        let executor = tokio::Runner::default();
1294        test_storage_operations(executor);
1295    }
1296
1297    #[test]
1298    fn test_tokio_blob_read_write() {
1299        let executor = tokio::Runner::default();
1300        test_blob_read_write(executor);
1301    }
1302
1303    #[test]
1304    fn test_tokio_many_partition_read_write() {
1305        let executor = tokio::Runner::default();
1306        test_many_partition_read_write(executor);
1307    }
1308
1309    #[test]
1310    fn test_tokio_blob_read_past_length() {
1311        let executor = tokio::Runner::default();
1312        test_blob_read_past_length(executor);
1313    }
1314
1315    #[test]
1316    fn test_tokio_blob_clone_and_concurrent_read() {
1317        // Run test
1318        let executor = tokio::Runner::default();
1319        test_blob_clone_and_concurrent_read(executor);
1320    }
1321
1322    #[test]
1323    fn test_tokio_shutdown() {
1324        let executor = tokio::Runner::default();
1325        test_shutdown(executor);
1326    }
1327
1328    #[test]
1329    fn test_tokio_spawn_ref() {
1330        let executor = tokio::Runner::default();
1331        test_spawn_ref(executor);
1332    }
1333
1334    #[test]
1335    #[should_panic]
1336    fn test_tokio_spawn_ref_duplicate() {
1337        let executor = tokio::Runner::default();
1338        test_spawn_ref_duplicate(executor);
1339    }
1340
1341    #[test]
1342    #[should_panic]
1343    fn test_tokio_spawn_duplicate() {
1344        let executor = tokio::Runner::default();
1345        test_spawn_duplicate(executor);
1346    }
1347
1348    #[test]
1349    fn test_tokio_spawn_blocking() {
1350        for dedicated in [false, true] {
1351            let executor = tokio::Runner::default();
1352            test_spawn_blocking(executor, dedicated);
1353        }
1354    }
1355
1356    #[test]
1357    fn test_tokio_spawn_blocking_panic() {
1358        for dedicated in [false, true] {
1359            let executor = tokio::Runner::default();
1360            executor.start(|context| async move {
1361                let handle = context.spawn_blocking(dedicated, |_| {
1362                    panic!("blocking task panicked");
1363                });
1364                let result = handle.await;
1365                assert!(matches!(result, Err(Error::Exited)));
1366            });
1367        }
1368    }
1369
1370    #[test]
1371    fn test_tokio_spawn_blocking_abort() {
1372        for dedicated in [false, true] {
1373            let executor = tokio::Runner::default();
1374            test_spawn_blocking_abort(executor, dedicated);
1375        }
1376    }
1377
1378    #[test]
1379    fn test_tokio_spawn_blocking_ref() {
1380        for dedicated in [false, true] {
1381            let executor = tokio::Runner::default();
1382            test_spawn_blocking_ref(executor, dedicated);
1383        }
1384    }
1385
1386    #[test]
1387    #[should_panic]
1388    fn test_tokio_spawn_blocking_ref_duplicate() {
1389        for dedicated in [false, true] {
1390            let executor = tokio::Runner::default();
1391            test_spawn_blocking_ref_duplicate(executor, dedicated);
1392        }
1393    }
1394
1395    #[test]
1396    fn test_tokio_metrics() {
1397        let executor = tokio::Runner::default();
1398        test_metrics(executor);
1399    }
1400
1401    #[test]
1402    #[should_panic]
1403    fn test_tokio_metrics_label() {
1404        let executor = tokio::Runner::default();
1405        test_metrics_label(executor);
1406    }
1407
1408    #[test]
1409    fn test_tokio_telemetry() {
1410        let executor = tokio::Runner::default();
1411        executor.start(|context| async move {
1412            // Define the server address
1413            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1414
1415            // Configure telemetry
1416            tokio::telemetry::init(
1417                context.with_label("metrics"),
1418                tokio::telemetry::Logging {
1419                    level: Level::INFO,
1420                    json: false,
1421                },
1422                Some(address),
1423                None,
1424            );
1425
1426            // Register a test metric
1427            let counter: Counter<u64> = Counter::default();
1428            context.register("test_counter", "Test counter", counter.clone());
1429            counter.inc();
1430
1431            // Helper functions to parse HTTP response
1432            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1433                let mut line = Vec::new();
1434                loop {
1435                    let byte = stream.recv(vec![0; 1]).await?;
1436                    if byte[0] == b'\n' {
1437                        if line.last() == Some(&b'\r') {
1438                            line.pop(); // Remove trailing \r
1439                        }
1440                        break;
1441                    }
1442                    line.push(byte[0]);
1443                }
1444                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1445            }
1446
1447            async fn read_headers<St: Stream>(
1448                stream: &mut St,
1449            ) -> Result<HashMap<String, String>, Error> {
1450                let mut headers = HashMap::new();
1451                loop {
1452                    let line = read_line(stream).await?;
1453                    if line.is_empty() {
1454                        break;
1455                    }
1456                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1457                    if parts.len() == 2 {
1458                        headers.insert(parts[0].to_string(), parts[1].to_string());
1459                    }
1460                }
1461                Ok(headers)
1462            }
1463
1464            async fn read_body<St: Stream>(
1465                stream: &mut St,
1466                content_length: usize,
1467            ) -> Result<String, Error> {
1468                let read = stream.recv(vec![0; content_length]).await?;
1469                String::from_utf8(read.as_ref().to_vec()).map_err(|_| Error::ReadFailed)
1470            }
1471
1472            // Simulate a client connecting to the server
1473            let client_handle = context
1474                .with_label("client")
1475                .spawn(move |context| async move {
1476                    let (mut sink, mut stream) = loop {
1477                        match context.dial(address).await {
1478                            Ok((sink, stream)) => break (sink, stream),
1479                            Err(e) => {
1480                                // The client may be polled before the server is ready, that's alright!
1481                                error!(err =?e, "failed to connect");
1482                                context.sleep(Duration::from_millis(10)).await;
1483                            }
1484                        }
1485                    };
1486
1487                    // Send a GET request to the server
1488                    let request = format!(
1489                        "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1490                        address
1491                    );
1492                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
1493
1494                    // Read and verify the HTTP status line
1495                    let status_line = read_line(&mut stream).await.unwrap();
1496                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1497
1498                    // Read and parse headers
1499                    let headers = read_headers(&mut stream).await.unwrap();
1500                    println!("Headers: {:?}", headers);
1501                    let content_length = headers
1502                        .get("content-length")
1503                        .unwrap()
1504                        .parse::<usize>()
1505                        .unwrap();
1506
1507                    // Read and verify the body
1508                    let body = read_body(&mut stream, content_length).await.unwrap();
1509                    assert!(body.contains("test_counter_total 1"));
1510                });
1511
1512            // Wait for the client task to complete
1513            client_handle.await.unwrap();
1514        });
1515    }
1516}