commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use commonware_utils::{StableBuf, StableBufMut};
21use prometheus_client::registry::Metric;
22use std::io::Error as IoError;
23use std::{
24    future::Future,
25    net::SocketAddr,
26    time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36    if #[cfg(not(target_arch = "wasm32"))] {
37        pub mod tokio;
38        pub mod benchmarks;
39    }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49/// Prefix for runtime metrics.
50const METRICS_PREFIX: &str = "runtime";
51
52/// Errors that can occur when interacting with the runtime.
53#[derive(Error, Debug)]
54pub enum Error {
55    #[error("exited")]
56    Exited,
57    #[error("closed")]
58    Closed,
59    #[error("timeout")]
60    Timeout,
61    #[error("bind failed")]
62    BindFailed,
63    #[error("connection failed")]
64    ConnectionFailed,
65    #[error("write failed")]
66    WriteFailed,
67    #[error("read failed")]
68    ReadFailed,
69    #[error("send failed")]
70    SendFailed,
71    #[error("recv failed")]
72    RecvFailed,
73    #[error("partition creation failed: {0}")]
74    PartitionCreationFailed(String),
75    #[error("partition missing: {0}")]
76    PartitionMissing(String),
77    #[error("partition corrupt: {0}")]
78    PartitionCorrupt(String),
79    #[error("blob open failed: {0}/{1} error: {2}")]
80    BlobOpenFailed(String, String, IoError),
81    #[error("blob missing: {0}/{1}")]
82    BlobMissing(String, String),
83    #[error("blob truncate failed: {0}/{1} error: {2}")]
84    BlobTruncateFailed(String, String, IoError),
85    #[error("blob sync failed: {0}/{1} error: {2}")]
86    BlobSyncFailed(String, String, IoError),
87    #[error("blob close failed: {0}/{1} error: {2}")]
88    BlobCloseFailed(String, String, IoError),
89    #[error("blob insufficient length")]
90    BlobInsufficientLength,
91    #[error("offset overflow")]
92    OffsetOverflow,
93}
94
95/// Interface that any task scheduler must implement to start
96/// running tasks.
97pub trait Runner {
98    /// Context defines the environment available to tasks.
99    /// Example of possible services provided by the context include:
100    /// - [Clock] for time-based operations
101    /// - [Network] for network operations
102    /// - [Storage] for storage operations
103    type Context;
104
105    /// Start running a root task.
106    fn start<F, Fut>(self, f: F) -> Fut::Output
107    where
108        F: FnOnce(Self::Context) -> Fut,
109        Fut: Future;
110}
111
112/// Interface that any task scheduler must implement to spawn tasks.
113pub trait Spawner: Clone + Send + Sync + 'static {
114    /// Enqueue a task to be executed.
115    ///
116    /// Unlike a future, a spawned task will start executing immediately (even if the caller
117    /// does not await the handle).
118    ///
119    /// Spawned tasks consume the context used to create them. This ensures that context cannot
120    /// be shared between tasks and that a task's context always comes from somewhere.
121    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122    where
123        F: FnOnce(Self) -> Fut + Send + 'static,
124        Fut: Future<Output = T> + Send + 'static,
125        T: Send + 'static;
126
127    /// Enqueue a task to be executed (without consuming the context).
128    ///
129    /// The semantics are the same as [Spawner::spawn].
130    ///
131    /// # Warning
132    ///
133    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
134    /// to prevent accidental misuse.
135    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136    where
137        F: Future<Output = T> + Send + 'static,
138        T: Send + 'static;
139
140    /// Enqueue a blocking task to be executed.
141    ///
142    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
143    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
144    /// thread (tasks that are expected to run indefinitely).
145    ///
146    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
147    /// result.
148    ///
149    /// # Motivation
150    ///
151    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
152    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
153    /// of said runtimes.
154    ///
155    /// # Warning
156    ///
157    /// Blocking tasks cannot be aborted.
158    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159    where
160        F: FnOnce(Self) -> T + Send + 'static,
161        T: Send + 'static;
162
163    /// Enqueue a blocking task to be executed (without consuming the context).
164    ///
165    /// The semantics are the same as [Spawner::spawn_blocking].
166    ///
167    /// # Warning
168    ///
169    /// If this function is used to spawn multiple tasks from the same context,
170    /// the runtime will panic to prevent accidental misuse.
171    fn spawn_blocking_ref<F, T>(
172        &mut self,
173        dedicated: bool,
174    ) -> impl FnOnce(F) -> Handle<T> + 'static
175    where
176        F: FnOnce() -> T + Send + 'static,
177        T: Send + 'static;
178
179    /// Signals the runtime to stop execution and that all outstanding tasks
180    /// should perform any required cleanup and exit. This method is idempotent and
181    /// can be called multiple times.
182    ///
183    /// This method does not actually kill any tasks but rather signals to them, using
184    /// the `Signal` returned by `stopped`, that they should exit.
185    fn stop(&self, value: i32);
186
187    /// Returns an instance of a `Signal` that resolves when `stop` is called by
188    /// any task.
189    ///
190    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
191    fn stopped(&self) -> Signal;
192}
193
194/// Interface to register and encode metrics.
195pub trait Metrics: Clone + Send + Sync + 'static {
196    /// Get the current label of the context.
197    fn label(&self) -> String;
198
199    /// Create a new instance of `Metrics` with the given label appended to the end
200    /// of the current `Metrics` label.
201    ///
202    /// This is commonly used to create a nested context for `register`.
203    ///
204    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
205    /// label (reserved for metrics for the runtime).
206    fn with_label(&self, label: &str) -> Self;
207
208    /// Prefix the given label with the current context's label.
209    ///
210    /// Unlike `with_label`, this method does not create a new context.
211    fn scoped_label(&self, label: &str) -> String {
212        let label = if self.label().is_empty() {
213            label.to_string()
214        } else {
215            format!("{}_{}", self.label(), label)
216        };
217        assert!(
218            !label.starts_with(METRICS_PREFIX),
219            "using runtime label is not allowed"
220        );
221        label
222    }
223
224    /// Register a metric with the runtime.
225    ///
226    /// Any registered metric will include (as a prefix) the label of the current context.
227    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
228
229    /// Encode all metrics into a buffer.
230    fn encode(&self) -> String;
231}
232
233/// Interface that any task scheduler must implement to provide
234/// time-based operations.
235///
236/// It is necessary to mock time to provide deterministic execution
237/// of arbitrary tasks.
238pub trait Clock: Clone + Send + Sync + 'static {
239    /// Returns the current time.
240    fn current(&self) -> SystemTime;
241
242    /// Sleep for the given duration.
243    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
244
245    /// Sleep until the given deadline.
246    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
247}
248
249/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
250pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
251
252/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
253pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
254
255/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
256pub type ListenerOf<N> = <N as crate::Network>::Listener;
257
258/// Interface that any runtime must implement to create
259/// network connections.
260pub trait Network: Clone + Send + Sync + 'static {
261    /// The type of [Listener] that's returned when binding to a socket.
262    /// Accepting a connection returns a [Sink] and [Stream] which are defined
263    /// by the [Listener] and used to send and receive data over the connection.
264    type Listener: Listener;
265
266    /// Bind to the given socket address.
267    fn bind(
268        &self,
269        socket: SocketAddr,
270    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
271
272    /// Dial the given socket address.
273    fn dial(
274        &self,
275        socket: SocketAddr,
276    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
277}
278
279/// Interface that any runtime must implement to handle
280/// incoming network connections.
281pub trait Listener: Sync + Send + 'static {
282    /// The type of [Sink] that's returned when accepting a connection.
283    /// This is used to send data to the remote connection.
284    type Sink: Sink;
285    /// The type of [Stream] that's returned when accepting a connection.
286    /// This is used to receive data from the remote connection.
287    type Stream: Stream;
288
289    /// Accept an incoming connection.
290    fn accept(
291        &mut self,
292    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
293
294    /// Returns the local address of the listener.
295    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
296}
297
298/// Interface that any runtime must implement to send
299/// messages over a network connection.
300pub trait Sink: Sync + Send + 'static {
301    /// Send a message to the sink.
302    fn send<B: StableBuf>(&mut self, msg: B) -> impl Future<Output = Result<(), Error>> + Send;
303}
304
305/// Interface that any runtime must implement to receive
306/// messages over a network connection.
307pub trait Stream: Sync + Send + 'static {
308    /// Receive a message from the stream, storing it in the given buffer.
309    /// Reads exactly the number of bytes that fit in the buffer.
310    fn recv<B: StableBufMut>(&mut self, buf: B) -> impl Future<Output = Result<B, Error>> + Send;
311}
312
313/// Interface to interact with storage.
314///
315///
316/// To support storage implementations that enable concurrent reads and
317/// writes, blobs are responsible for maintaining synchronization.
318///
319/// Storage can be backed by a local filesystem, cloud storage, etc.
320pub trait Storage: Clone + Send + Sync + 'static {
321    /// The readable/writeable storage buffer that can be opened by this Storage.
322    type Blob: Blob;
323
324    /// Open an existing blob in a given partition or create a new one, returning
325    /// the blob and its length.
326    ///
327    /// Multiple instances of the same blob can be opened concurrently, however,
328    /// writing to the same blob concurrently may lead to undefined behavior.
329    fn open(
330        &self,
331        partition: &str,
332        name: &[u8],
333    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
334
335    /// Remove a blob from a given partition.
336    ///
337    /// If no `name` is provided, the entire partition is removed.
338    fn remove(
339        &self,
340        partition: &str,
341        name: Option<&[u8]>,
342    ) -> impl Future<Output = Result<(), Error>> + Send;
343
344    /// Return all blobs in a given partition.
345    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
346}
347
348/// Interface to read and write to a blob.
349///
350/// To support blob implementations that enable concurrent reads and
351/// writes, blobs are responsible for maintaining synchronization.
352///
353/// Cloning a blob is similar to wrapping a single file descriptor in
354/// a lock whereas opening a new blob (of the same name) is similar to
355/// opening a new file descriptor. If multiple blobs are opened with the same
356/// name, they are not expected to coordinate access to underlying storage
357/// and writing to both is undefined behavior.
358#[allow(clippy::len_without_is_empty)]
359pub trait Blob: Clone + Send + Sync + 'static {
360    /// Read from the blob at the given offset.
361    ///
362    /// `read_at` does not return the number of bytes read because it
363    /// only returns once the entire buffer has been filled.
364    fn read_at<B: StableBufMut>(
365        &self,
366        buf: B,
367        offset: u64,
368    ) -> impl Future<Output = Result<B, Error>> + Send;
369
370    /// Write `buf` to the blob at the given offset.
371    fn write_at<B: StableBuf>(
372        &self,
373        buf: B,
374        offset: u64,
375    ) -> impl Future<Output = Result<(), Error>> + Send;
376
377    /// Truncate the blob to the given length.
378    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
379
380    /// Ensure all pending data is durably persisted.
381    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
382
383    /// Close the blob.
384    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use bytes::Bytes;
391    use commonware_macros::select;
392    use futures::channel::oneshot;
393    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
394    use prometheus_client::metrics::counter::Counter;
395    use std::collections::HashMap;
396    use std::panic::{catch_unwind, AssertUnwindSafe};
397    use std::str::FromStr;
398    use std::sync::Mutex;
399    use tracing::{error, Level};
400    use utils::reschedule;
401
402    fn test_error_future<R: Runner>(runner: R) {
403        async fn error_future() -> Result<&'static str, &'static str> {
404            Err("An error occurred")
405        }
406        let result = runner.start(|_| error_future());
407        assert_eq!(result, Err("An error occurred"));
408    }
409
410    fn test_clock_sleep<R: Runner>(runner: R)
411    where
412        R::Context: Spawner + Clock,
413    {
414        runner.start(|context| async move {
415            // Capture initial time
416            let start = context.current();
417            let sleep_duration = Duration::from_millis(10);
418            context.sleep(sleep_duration).await;
419
420            // After run, time should have advanced
421            let end = context.current();
422            assert!(end.duration_since(start).unwrap() >= sleep_duration);
423        });
424    }
425
426    fn test_clock_sleep_until<R: Runner>(runner: R)
427    where
428        R::Context: Spawner + Clock,
429    {
430        runner.start(|context| async move {
431            // Trigger sleep
432            let now = context.current();
433            context.sleep_until(now + Duration::from_millis(100)).await;
434
435            // Ensure slept duration has elapsed
436            let elapsed = now.elapsed().unwrap();
437            assert!(elapsed >= Duration::from_millis(100));
438        });
439    }
440
441    fn test_root_finishes<R: Runner>(runner: R)
442    where
443        R::Context: Spawner,
444    {
445        runner.start(|context| async move {
446            context.spawn(|_| async move {
447                loop {
448                    reschedule().await;
449                }
450            });
451        });
452    }
453
454    fn test_spawn_abort<R: Runner>(runner: R)
455    where
456        R::Context: Spawner,
457    {
458        runner.start(|context| async move {
459            let handle = context.spawn(|_| async move {
460                loop {
461                    reschedule().await;
462                }
463            });
464            handle.abort();
465            assert!(matches!(handle.await, Err(Error::Closed)));
466        });
467    }
468
469    fn test_panic_aborts_root<R: Runner>(runner: R) {
470        let result = catch_unwind(AssertUnwindSafe(|| {
471            runner.start(|_| async move {
472                panic!("blah");
473            });
474        }));
475        result.unwrap_err();
476    }
477
478    fn test_panic_aborts_spawn<R: Runner>(runner: R)
479    where
480        R::Context: Spawner,
481    {
482        let result = runner.start(|context| async move {
483            let result = context.spawn(|_| async move {
484                panic!("blah");
485            });
486            assert!(matches!(result.await, Err(Error::Exited)));
487            Result::<(), Error>::Ok(())
488        });
489
490        // Ensure panic was caught
491        result.unwrap();
492    }
493
494    fn test_select<R: Runner>(runner: R) {
495        runner.start(|_| async move {
496            // Test first branch
497            let output = Mutex::new(0);
498            select! {
499                v1 = ready(1) => {
500                    *output.lock().unwrap() = v1;
501                },
502                v2 = ready(2) => {
503                    *output.lock().unwrap() = v2;
504                },
505            };
506            assert_eq!(*output.lock().unwrap(), 1);
507
508            // Test second branch
509            select! {
510                v1 = std::future::pending::<i32>() => {
511                    *output.lock().unwrap() = v1;
512                },
513                v2 = ready(2) => {
514                    *output.lock().unwrap() = v2;
515                },
516            };
517            assert_eq!(*output.lock().unwrap(), 2);
518        });
519    }
520
521    /// Ensure future fusing works as expected.
522    fn test_select_loop<R: Runner>(runner: R)
523    where
524        R::Context: Clock,
525    {
526        runner.start(|context| async move {
527            // Should hit timeout
528            let (mut sender, mut receiver) = mpsc::unbounded();
529            for _ in 0..2 {
530                select! {
531                    v = receiver.next() => {
532                        panic!("unexpected value: {:?}", v);
533                    },
534                    _ = context.sleep(Duration::from_millis(100)) => {
535                        continue;
536                    },
537                };
538            }
539
540            // Populate channel
541            sender.send(0).await.unwrap();
542            sender.send(1).await.unwrap();
543
544            // Prefer not reading channel without losing messages
545            select! {
546                _ = async {} => {
547                    // Skip reading from channel even though populated
548                },
549                v = receiver.next() => {
550                    panic!("unexpected value: {:?}", v);
551                },
552            };
553
554            // Process messages
555            for i in 0..2 {
556                select! {
557                    _ = context.sleep(Duration::from_millis(100)) => {
558                        panic!("timeout");
559                    },
560                    v = receiver.next() => {
561                        assert_eq!(v.unwrap(), i);
562                    },
563                };
564            }
565        });
566    }
567
568    fn test_storage_operations<R: Runner>(runner: R)
569    where
570        R::Context: Storage,
571    {
572        runner.start(|context| async move {
573            let partition = "test_partition";
574            let name = b"test_blob";
575
576            // Open a new blob
577            let (blob, _) = context
578                .open(partition, name)
579                .await
580                .expect("Failed to open blob");
581
582            // Write data to the blob
583            let data = b"Hello, Storage!";
584            blob.write_at(Vec::from(data), 0)
585                .await
586                .expect("Failed to write to blob");
587
588            // Sync the blob
589            blob.sync().await.expect("Failed to sync blob");
590
591            // Read data from the blob
592            let read = blob
593                .read_at(vec![0; data.len()], 0)
594                .await
595                .expect("Failed to read from blob");
596            assert_eq!(read, data);
597
598            // Close the blob
599            blob.close().await.expect("Failed to close blob");
600
601            // Scan blobs in the partition
602            let blobs = context
603                .scan(partition)
604                .await
605                .expect("Failed to scan partition");
606            assert!(blobs.contains(&name.to_vec()));
607
608            // Reopen the blob
609            let (blob, len) = context
610                .open(partition, name)
611                .await
612                .expect("Failed to reopen blob");
613            assert_eq!(len, data.len() as u64);
614
615            // Read data part of message back
616            let read = blob
617                .read_at(vec![0u8; 7], 7)
618                .await
619                .expect("Failed to read data");
620            assert_eq!(read, b"Storage");
621
622            // Close the blob
623            blob.close().await.expect("Failed to close blob");
624
625            // Remove the blob
626            context
627                .remove(partition, Some(name))
628                .await
629                .expect("Failed to remove blob");
630
631            // Ensure the blob is removed
632            let blobs = context
633                .scan(partition)
634                .await
635                .expect("Failed to scan partition");
636            assert!(!blobs.contains(&name.to_vec()));
637
638            // Remove the partition
639            context
640                .remove(partition, None)
641                .await
642                .expect("Failed to remove partition");
643
644            // Scan the partition
645            let result = context.scan(partition).await;
646            assert!(matches!(result, Err(Error::PartitionMissing(_))));
647        });
648    }
649
650    fn test_blob_read_write<R: Runner>(runner: R)
651    where
652        R::Context: Storage,
653    {
654        runner.start(|context| async move {
655            let partition = "test_partition";
656            let name = b"test_blob_rw";
657
658            // Open a new blob
659            let (blob, _) = context
660                .open(partition, name)
661                .await
662                .expect("Failed to open blob");
663
664            // Write data at different offsets
665            let data1 = b"Hello";
666            let data2 = b"World";
667            blob.write_at(Vec::from(data1), 0)
668                .await
669                .expect("Failed to write data1");
670            blob.write_at(Vec::from(data2), 5)
671                .await
672                .expect("Failed to write data2");
673
674            // Read data back
675            let read = blob
676                .read_at(vec![0u8; 10], 0)
677                .await
678                .expect("Failed to read data");
679            assert_eq!(&read[..5], data1);
680            assert_eq!(&read[5..], data2);
681
682            // Rewrite data without affecting length
683            let data3 = b"Store";
684            blob.write_at(Vec::from(data3), 5)
685                .await
686                .expect("Failed to write data3");
687
688            // Truncate the blob
689            blob.truncate(5).await.expect("Failed to truncate blob");
690            let read = blob
691                .read_at(vec![0; 5], 0)
692                .await
693                .expect("Failed to read data");
694            assert_eq!(&read[..5], data1);
695
696            // Full read after truncation
697            let result = blob.read_at(vec![0u8; 10], 0).await;
698            assert!(result.is_err());
699
700            // Close the blob
701            blob.close().await.expect("Failed to close blob");
702        });
703    }
704
705    fn test_many_partition_read_write<R: Runner>(runner: R)
706    where
707        R::Context: Storage,
708    {
709        runner.start(|context| async move {
710            let partitions = ["partition1", "partition2", "partition3"];
711            let name = b"test_blob_rw";
712            let data1 = b"Hello";
713            let data2 = b"World";
714
715            for (additional, partition) in partitions.iter().enumerate() {
716                // Open a new blob
717                let (blob, _) = context
718                    .open(partition, name)
719                    .await
720                    .expect("Failed to open blob");
721
722                // Write data at different offsets
723                blob.write_at(Vec::from(data1), 0)
724                    .await
725                    .expect("Failed to write data1");
726                blob.write_at(Vec::from(data2), 5 + additional as u64)
727                    .await
728                    .expect("Failed to write data2");
729
730                // Close the blob
731                blob.close().await.expect("Failed to close blob");
732            }
733
734            for (additional, partition) in partitions.iter().enumerate() {
735                // Open a new blob
736                let (blob, len) = context
737                    .open(partition, name)
738                    .await
739                    .expect("Failed to open blob");
740                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
741
742                // Read data back
743                let read = blob
744                    .read_at(vec![0u8; 10 + additional], 0)
745                    .await
746                    .expect("Failed to read data");
747                assert_eq!(&read[..5], b"Hello");
748                assert_eq!(&read[5 + additional..], b"World");
749
750                // Close the blob
751                blob.close().await.expect("Failed to close blob");
752            }
753        });
754    }
755
756    fn test_blob_read_past_length<R: Runner>(runner: R)
757    where
758        R::Context: Storage,
759    {
760        runner.start(|context| async move {
761            let partition = "test_partition";
762            let name = b"test_blob_rw";
763
764            // Open a new blob
765            let (blob, _) = context
766                .open(partition, name)
767                .await
768                .expect("Failed to open blob");
769
770            // Read data past file length (empty file)
771            let result = blob.read_at(vec![0u8; 10], 0).await;
772            assert!(result.is_err());
773
774            // Write data to the blob
775            let data = b"Hello, Storage!".to_vec();
776            blob.write_at(data, 0)
777                .await
778                .expect("Failed to write to blob");
779
780            // Read data past file length (non-empty file)
781            let result = blob.read_at(vec![0u8; 20], 0).await;
782            assert!(result.is_err());
783        })
784    }
785
786    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
787    where
788        R::Context: Spawner + Storage + Metrics,
789    {
790        runner.start(|context| async move {
791            let partition = "test_partition";
792            let name = b"test_blob_rw";
793
794            // Open a new blob
795            let (blob, _) = context
796                .open(partition, name)
797                .await
798                .expect("Failed to open blob");
799
800            // Write data to the blob
801            let data = b"Hello, Storage!";
802            blob.write_at(Vec::from(data), 0)
803                .await
804                .expect("Failed to write to blob");
805
806            // Sync the blob
807            blob.sync().await.expect("Failed to sync blob");
808
809            // Read data from the blob in clone
810            let check1 = context.with_label("check1").spawn({
811                let blob = blob.clone();
812                move |_| async move {
813                    let read = blob
814                        .read_at(vec![0u8; data.len()], 0)
815                        .await
816                        .expect("Failed to read from blob");
817                    assert_eq!(read, data);
818                }
819            });
820            let check2 = context.with_label("check2").spawn({
821                let blob = blob.clone();
822                move |_| async move {
823                    let read = blob
824                        .read_at(vec![0; data.len()], 0)
825                        .await
826                        .expect("Failed to read from blob");
827                    assert_eq!(read, data);
828                }
829            });
830
831            // Wait for both reads to complete
832            let result = join!(check1, check2);
833            assert!(result.0.is_ok());
834            assert!(result.1.is_ok());
835
836            // Read data from the blob
837            let read = blob
838                .read_at(vec![0; data.len()], 0)
839                .await
840                .expect("Failed to read from blob");
841            assert_eq!(read, data);
842
843            // Close the blob
844            blob.close().await.expect("Failed to close blob");
845
846            // Ensure no blobs still open
847            let buffer = context.encode();
848            assert!(buffer.contains("open_blobs 0"));
849        });
850    }
851
852    fn test_shutdown<R: Runner>(runner: R)
853    where
854        R::Context: Spawner + Metrics + Clock,
855    {
856        let kill = 9;
857        runner.start(|context| async move {
858            // Spawn a task that waits for signal
859            let before = context
860                .with_label("before")
861                .spawn(move |context| async move {
862                    let sig = context.stopped().await;
863                    assert_eq!(sig.unwrap(), kill);
864                });
865
866            // Spawn a task after stop is called
867            let after = context
868                .with_label("after")
869                .spawn(move |context| async move {
870                    // Wait for stop signal
871                    let mut signal = context.stopped();
872                    loop {
873                        select! {
874                            sig = &mut signal => {
875                                // Stopper resolved
876                                assert_eq!(sig.unwrap(), kill);
877                                break;
878                            },
879                            _ = context.sleep(Duration::from_millis(10)) => {
880                                // Continue waiting
881                            },
882                        }
883                    }
884                });
885
886            // Sleep for a bit before stopping
887            context.sleep(Duration::from_millis(50)).await;
888
889            // Signal the task
890            context.stop(kill);
891
892            // Ensure both tasks complete
893            let result = join!(before, after);
894            assert!(result.0.is_ok());
895            assert!(result.1.is_ok());
896        });
897    }
898
899    fn test_spawn_ref<R: Runner>(runner: R)
900    where
901        R::Context: Spawner,
902    {
903        runner.start(|mut context| async move {
904            let handle = context.spawn_ref();
905            let result = handle(async move { 42 }).await;
906            assert!(matches!(result, Ok(42)));
907        });
908    }
909
910    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
911    where
912        R::Context: Spawner,
913    {
914        runner.start(|mut context| async move {
915            let handle = context.spawn_ref();
916            let result = handle(async move { 42 }).await;
917            assert!(matches!(result, Ok(42)));
918
919            // Ensure context is consumed
920            let handle = context.spawn_ref();
921            let result = handle(async move { 42 }).await;
922            assert!(matches!(result, Ok(42)));
923        });
924    }
925
926    fn test_spawn_duplicate<R: Runner>(runner: R)
927    where
928        R::Context: Spawner,
929    {
930        runner.start(|mut context| async move {
931            let handle = context.spawn_ref();
932            let result = handle(async move { 42 }).await;
933            assert!(matches!(result, Ok(42)));
934
935            // Ensure context is consumed
936            context.spawn(|_| async move { 42 });
937        });
938    }
939
940    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
941    where
942        R::Context: Spawner,
943    {
944        runner.start(|context| async move {
945            let handle = context.spawn_blocking(dedicated, |_| 42);
946            let result = handle.await;
947            assert!(matches!(result, Ok(42)));
948        });
949    }
950
951    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
952    where
953        R::Context: Spawner,
954    {
955        runner.start(|mut context| async move {
956            let spawn = context.spawn_blocking_ref(dedicated);
957            let handle = spawn(|| 42);
958            let result = handle.await;
959            assert!(matches!(result, Ok(42)));
960        });
961    }
962
963    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
964    where
965        R::Context: Spawner,
966    {
967        runner.start(|mut context| async move {
968            let spawn = context.spawn_blocking_ref(dedicated);
969            let result = spawn(|| 42).await;
970            assert!(matches!(result, Ok(42)));
971
972            // Ensure context is consumed
973            context.spawn_blocking(dedicated, |_| 42);
974        });
975    }
976
977    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
978    where
979        R::Context: Spawner,
980    {
981        runner.start(|context| async move {
982            // Create task
983            let (sender, mut receiver) = oneshot::channel();
984            let handle = context.spawn_blocking(dedicated, move |_| {
985                // Wait for abort to be called
986                loop {
987                    if receiver.try_recv().is_ok() {
988                        break;
989                    }
990                }
991
992                // Perform a long-running operation
993                let mut count = 0;
994                loop {
995                    count += 1;
996                    if count >= 100_000_000 {
997                        break;
998                    }
999                }
1000                count
1001            });
1002
1003            // Abort the task
1004            //
1005            // If there was an `.await` prior to sending a message over the oneshot, this test
1006            // could deadlock (depending on the runtime implementation) because the blocking task
1007            // would never yield (preventing send from being called).
1008            handle.abort();
1009            sender.send(()).unwrap();
1010
1011            // Wait for the task to complete
1012            assert!(matches!(handle.await, Ok(100_000_000)));
1013        });
1014    }
1015
1016    fn test_metrics<R: Runner>(runner: R)
1017    where
1018        R::Context: Metrics,
1019    {
1020        runner.start(|context| async move {
1021            // Assert label
1022            assert_eq!(context.label(), "");
1023
1024            // Register a metric
1025            let counter = Counter::<u64>::default();
1026            context.register("test", "test", counter.clone());
1027
1028            // Increment the counter
1029            counter.inc();
1030
1031            // Encode metrics
1032            let buffer = context.encode();
1033            assert!(buffer.contains("test_total 1"));
1034
1035            // Nested context
1036            let context = context.with_label("nested");
1037            let nested_counter = Counter::<u64>::default();
1038            context.register("test", "test", nested_counter.clone());
1039
1040            // Increment the counter
1041            nested_counter.inc();
1042
1043            // Encode metrics
1044            let buffer = context.encode();
1045            assert!(buffer.contains("nested_test_total 1"));
1046            assert!(buffer.contains("test_total 1"));
1047        });
1048    }
1049
1050    fn test_metrics_label<R: Runner>(runner: R)
1051    where
1052        R::Context: Metrics,
1053    {
1054        runner.start(|context| async move {
1055            context.with_label(METRICS_PREFIX);
1056        })
1057    }
1058
1059    #[test]
1060    fn test_deterministic_future() {
1061        let runner = deterministic::Runner::default();
1062        test_error_future(runner);
1063    }
1064
1065    #[test]
1066    fn test_deterministic_clock_sleep() {
1067        let executor = deterministic::Runner::default();
1068        test_clock_sleep(executor);
1069    }
1070
1071    #[test]
1072    fn test_deterministic_clock_sleep_until() {
1073        let executor = deterministic::Runner::default();
1074        test_clock_sleep_until(executor);
1075    }
1076
1077    #[test]
1078    fn test_deterministic_root_finishes() {
1079        let executor = deterministic::Runner::default();
1080        test_root_finishes(executor);
1081    }
1082
1083    #[test]
1084    fn test_deterministic_spawn_abort() {
1085        let executor = deterministic::Runner::default();
1086        test_spawn_abort(executor);
1087    }
1088
1089    #[test]
1090    fn test_deterministic_panic_aborts_root() {
1091        let runner = deterministic::Runner::default();
1092        test_panic_aborts_root(runner);
1093    }
1094
1095    #[test]
1096    #[should_panic(expected = "blah")]
1097    fn test_deterministic_panic_aborts_spawn() {
1098        let executor = deterministic::Runner::default();
1099        test_panic_aborts_spawn(executor);
1100    }
1101
1102    #[test]
1103    fn test_deterministic_select() {
1104        let executor = deterministic::Runner::default();
1105        test_select(executor);
1106    }
1107
1108    #[test]
1109    fn test_deterministic_select_loop() {
1110        let executor = deterministic::Runner::default();
1111        test_select_loop(executor);
1112    }
1113
1114    #[test]
1115    fn test_deterministic_storage_operations() {
1116        let executor = deterministic::Runner::default();
1117        test_storage_operations(executor);
1118    }
1119
1120    #[test]
1121    fn test_deterministic_blob_read_write() {
1122        let executor = deterministic::Runner::default();
1123        test_blob_read_write(executor);
1124    }
1125
1126    #[test]
1127    fn test_deterministic_many_partition_read_write() {
1128        let executor = deterministic::Runner::default();
1129        test_many_partition_read_write(executor);
1130    }
1131
1132    #[test]
1133    fn test_deterministic_blob_read_past_length() {
1134        let executor = deterministic::Runner::default();
1135        test_blob_read_past_length(executor);
1136    }
1137
1138    #[test]
1139    fn test_deterministic_blob_clone_and_concurrent_read() {
1140        // Run test
1141        let executor = deterministic::Runner::default();
1142        test_blob_clone_and_concurrent_read(executor);
1143    }
1144
1145    #[test]
1146    fn test_deterministic_shutdown() {
1147        let executor = deterministic::Runner::default();
1148        test_shutdown(executor);
1149    }
1150
1151    #[test]
1152    fn test_deterministic_spawn_ref() {
1153        let executor = deterministic::Runner::default();
1154        test_spawn_ref(executor);
1155    }
1156
1157    #[test]
1158    #[should_panic]
1159    fn test_deterministic_spawn_ref_duplicate() {
1160        let executor = deterministic::Runner::default();
1161        test_spawn_ref_duplicate(executor);
1162    }
1163
1164    #[test]
1165    #[should_panic]
1166    fn test_deterministic_spawn_duplicate() {
1167        let executor = deterministic::Runner::default();
1168        test_spawn_duplicate(executor);
1169    }
1170
1171    #[test]
1172    fn test_deterministic_spawn_blocking() {
1173        for dedicated in [false, true] {
1174            let executor = deterministic::Runner::default();
1175            test_spawn_blocking(executor, dedicated);
1176        }
1177    }
1178
1179    #[test]
1180    #[should_panic(expected = "blocking task panicked")]
1181    fn test_deterministic_spawn_blocking_panic() {
1182        for dedicated in [false, true] {
1183            let executor = deterministic::Runner::default();
1184            executor.start(|context| async move {
1185                let handle = context.spawn_blocking(dedicated, |_| {
1186                    panic!("blocking task panicked");
1187                });
1188                handle.await.unwrap();
1189            });
1190        }
1191    }
1192
1193    #[test]
1194    fn test_deterministic_spawn_blocking_abort() {
1195        for dedicated in [false, true] {
1196            let executor = deterministic::Runner::default();
1197            test_spawn_blocking_abort(executor, dedicated);
1198        }
1199    }
1200
1201    #[test]
1202    fn test_deterministic_spawn_blocking_ref() {
1203        for dedicated in [false, true] {
1204            let executor = deterministic::Runner::default();
1205            test_spawn_blocking_ref(executor, dedicated);
1206        }
1207    }
1208
1209    #[test]
1210    #[should_panic]
1211    fn test_deterministic_spawn_blocking_ref_duplicate() {
1212        for dedicated in [false, true] {
1213            let executor = deterministic::Runner::default();
1214            test_spawn_blocking_ref_duplicate(executor, dedicated);
1215        }
1216    }
1217
1218    #[test]
1219    fn test_deterministic_metrics() {
1220        let executor = deterministic::Runner::default();
1221        test_metrics(executor);
1222    }
1223
1224    #[test]
1225    #[should_panic]
1226    fn test_deterministic_metrics_label() {
1227        let executor = deterministic::Runner::default();
1228        test_metrics_label(executor);
1229    }
1230
1231    #[test]
1232    fn test_tokio_error_future() {
1233        let runner = tokio::Runner::default();
1234        test_error_future(runner);
1235    }
1236
1237    #[test]
1238    fn test_tokio_clock_sleep() {
1239        let executor = tokio::Runner::default();
1240        test_clock_sleep(executor);
1241    }
1242
1243    #[test]
1244    fn test_tokio_clock_sleep_until() {
1245        let executor = tokio::Runner::default();
1246        test_clock_sleep_until(executor);
1247    }
1248
1249    #[test]
1250    fn test_tokio_root_finishes() {
1251        let executor = tokio::Runner::default();
1252        test_root_finishes(executor);
1253    }
1254
1255    #[test]
1256    fn test_tokio_spawn_abort() {
1257        let executor = tokio::Runner::default();
1258        test_spawn_abort(executor);
1259    }
1260
1261    #[test]
1262    fn test_tokio_panic_aborts_root() {
1263        let executor = tokio::Runner::default();
1264        test_panic_aborts_root(executor);
1265    }
1266
1267    #[test]
1268    fn test_tokio_panic_aborts_spawn() {
1269        let executor = tokio::Runner::default();
1270        test_panic_aborts_spawn(executor);
1271    }
1272
1273    #[test]
1274    fn test_tokio_select() {
1275        let executor = tokio::Runner::default();
1276        test_select(executor);
1277    }
1278
1279    #[test]
1280    fn test_tokio_select_loop() {
1281        let executor = tokio::Runner::default();
1282        test_select_loop(executor);
1283    }
1284
1285    #[test]
1286    fn test_tokio_storage_operations() {
1287        let executor = tokio::Runner::default();
1288        test_storage_operations(executor);
1289    }
1290
1291    #[test]
1292    fn test_tokio_blob_read_write() {
1293        let executor = tokio::Runner::default();
1294        test_blob_read_write(executor);
1295    }
1296
1297    #[test]
1298    fn test_tokio_many_partition_read_write() {
1299        let executor = tokio::Runner::default();
1300        test_many_partition_read_write(executor);
1301    }
1302
1303    #[test]
1304    fn test_tokio_blob_read_past_length() {
1305        let executor = tokio::Runner::default();
1306        test_blob_read_past_length(executor);
1307    }
1308
1309    #[test]
1310    fn test_tokio_blob_clone_and_concurrent_read() {
1311        // Run test
1312        let executor = tokio::Runner::default();
1313        test_blob_clone_and_concurrent_read(executor);
1314    }
1315
1316    #[test]
1317    fn test_tokio_shutdown() {
1318        let executor = tokio::Runner::default();
1319        test_shutdown(executor);
1320    }
1321
1322    #[test]
1323    fn test_tokio_spawn_ref() {
1324        let executor = tokio::Runner::default();
1325        test_spawn_ref(executor);
1326    }
1327
1328    #[test]
1329    #[should_panic]
1330    fn test_tokio_spawn_ref_duplicate() {
1331        let executor = tokio::Runner::default();
1332        test_spawn_ref_duplicate(executor);
1333    }
1334
1335    #[test]
1336    #[should_panic]
1337    fn test_tokio_spawn_duplicate() {
1338        let executor = tokio::Runner::default();
1339        test_spawn_duplicate(executor);
1340    }
1341
1342    #[test]
1343    fn test_tokio_spawn_blocking() {
1344        for dedicated in [false, true] {
1345            let executor = tokio::Runner::default();
1346            test_spawn_blocking(executor, dedicated);
1347        }
1348    }
1349
1350    #[test]
1351    fn test_tokio_spawn_blocking_panic() {
1352        for dedicated in [false, true] {
1353            let executor = tokio::Runner::default();
1354            executor.start(|context| async move {
1355                let handle = context.spawn_blocking(dedicated, |_| {
1356                    panic!("blocking task panicked");
1357                });
1358                let result = handle.await;
1359                assert!(matches!(result, Err(Error::Exited)));
1360            });
1361        }
1362    }
1363
1364    #[test]
1365    fn test_tokio_spawn_blocking_abort() {
1366        for dedicated in [false, true] {
1367            let executor = tokio::Runner::default();
1368            test_spawn_blocking_abort(executor, dedicated);
1369        }
1370    }
1371
1372    #[test]
1373    fn test_tokio_spawn_blocking_ref() {
1374        for dedicated in [false, true] {
1375            let executor = tokio::Runner::default();
1376            test_spawn_blocking_ref(executor, dedicated);
1377        }
1378    }
1379
1380    #[test]
1381    #[should_panic]
1382    fn test_tokio_spawn_blocking_ref_duplicate() {
1383        for dedicated in [false, true] {
1384            let executor = tokio::Runner::default();
1385            test_spawn_blocking_ref_duplicate(executor, dedicated);
1386        }
1387    }
1388
1389    #[test]
1390    fn test_tokio_metrics() {
1391        let executor = tokio::Runner::default();
1392        test_metrics(executor);
1393    }
1394
1395    #[test]
1396    #[should_panic]
1397    fn test_tokio_metrics_label() {
1398        let executor = tokio::Runner::default();
1399        test_metrics_label(executor);
1400    }
1401
1402    #[test]
1403    fn test_tokio_telemetry() {
1404        let executor = tokio::Runner::default();
1405        executor.start(|context| async move {
1406            // Define the server address
1407            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1408
1409            // Configure telemetry
1410            tokio::telemetry::init(
1411                context.with_label("metrics"),
1412                tokio::telemetry::Logging {
1413                    level: Level::INFO,
1414                    json: false,
1415                },
1416                Some(address),
1417                None,
1418            );
1419
1420            // Register a test metric
1421            let counter: Counter<u64> = Counter::default();
1422            context.register("test_counter", "Test counter", counter.clone());
1423            counter.inc();
1424
1425            // Helper functions to parse HTTP response
1426            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1427                let mut line = Vec::new();
1428                loop {
1429                    let byte = stream.recv(vec![0; 1]).await?;
1430                    if byte[0] == b'\n' {
1431                        if line.last() == Some(&b'\r') {
1432                            line.pop(); // Remove trailing \r
1433                        }
1434                        break;
1435                    }
1436                    line.push(byte[0]);
1437                }
1438                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1439            }
1440
1441            async fn read_headers<St: Stream>(
1442                stream: &mut St,
1443            ) -> Result<HashMap<String, String>, Error> {
1444                let mut headers = HashMap::new();
1445                loop {
1446                    let line = read_line(stream).await?;
1447                    if line.is_empty() {
1448                        break;
1449                    }
1450                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1451                    if parts.len() == 2 {
1452                        headers.insert(parts[0].to_string(), parts[1].to_string());
1453                    }
1454                }
1455                Ok(headers)
1456            }
1457
1458            async fn read_body<St: Stream>(
1459                stream: &mut St,
1460                content_length: usize,
1461            ) -> Result<String, Error> {
1462                let read = stream.recv(vec![0; content_length]).await?;
1463                String::from_utf8(read).map_err(|_| Error::ReadFailed)
1464            }
1465
1466            // Simulate a client connecting to the server
1467            let client_handle = context
1468                .with_label("client")
1469                .spawn(move |context| async move {
1470                    let (mut sink, mut stream) = loop {
1471                        match context.dial(address).await {
1472                            Ok((sink, stream)) => break (sink, stream),
1473                            Err(e) => {
1474                                // The client may be polled before the server is ready, that's alright!
1475                                error!(err =?e, "failed to connect");
1476                                context.sleep(Duration::from_millis(10)).await;
1477                            }
1478                        }
1479                    };
1480
1481                    // Send a GET request to the server
1482                    let request = format!(
1483                        "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1484                        address
1485                    );
1486                    sink.send(Bytes::from(request)).await.unwrap();
1487
1488                    // Read and verify the HTTP status line
1489                    let status_line = read_line(&mut stream).await.unwrap();
1490                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1491
1492                    // Read and parse headers
1493                    let headers = read_headers(&mut stream).await.unwrap();
1494                    println!("Headers: {:?}", headers);
1495                    let content_length = headers
1496                        .get("content-length")
1497                        .unwrap()
1498                        .parse::<usize>()
1499                        .unwrap();
1500
1501                    // Read and verify the body
1502                    let body = read_body(&mut stream, content_length).await.unwrap();
1503                    assert!(body.contains("test_counter_total 1"));
1504                });
1505
1506            // Wait for the client task to complete
1507            client_handle.await.unwrap();
1508        });
1509    }
1510}