commonware_runtime/
lib.rs

1//! Execute asynchronous tasks with a configurable scheduler.
2//!
3//! This crate provides a collection of runtimes that can be
4//! used to execute asynchronous tasks in a variety of ways. For production use,
5//! the `tokio` module provides a runtime backed by [Tokio](https://tokio.rs).
6//! For testing and simulation, the `deterministic` module provides a runtime
7//! that allows for deterministic execution of tasks (given a fixed seed).
8//!
9//! # Terminology
10//!
11//! Each runtime is typically composed of an `Executor` and a `Context`. The `Executor` implements the
12//! `Runner` trait and drives execution of a runtime. The `Context` implements any number of the
13//! other traits to provide core functionality.
14//!
15//! # Status
16//!
17//! `commonware-runtime` is **ALPHA** software and is not yet recommended for production use. Developers should
18//! expect breaking changes and occasional instability.
19
20use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23    future::Future,
24    io::Error as IoError,
25    net::SocketAddr,
26    time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36    if #[cfg(not(target_arch = "wasm32"))] {
37        pub mod tokio;
38        pub mod benchmarks;
39    }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49/// Prefix for runtime metrics.
50const METRICS_PREFIX: &str = "runtime";
51
52/// Errors that can occur when interacting with the runtime.
53#[derive(Error, Debug)]
54pub enum Error {
55    #[error("exited")]
56    Exited,
57    #[error("closed")]
58    Closed,
59    #[error("timeout")]
60    Timeout,
61    #[error("bind failed")]
62    BindFailed,
63    #[error("connection failed")]
64    ConnectionFailed,
65    #[error("write failed")]
66    WriteFailed,
67    #[error("read failed")]
68    ReadFailed,
69    #[error("send failed")]
70    SendFailed,
71    #[error("recv failed")]
72    RecvFailed,
73    #[error("partition creation failed: {0}")]
74    PartitionCreationFailed(String),
75    #[error("partition missing: {0}")]
76    PartitionMissing(String),
77    #[error("partition corrupt: {0}")]
78    PartitionCorrupt(String),
79    #[error("blob open failed: {0}/{1} error: {2}")]
80    BlobOpenFailed(String, String, IoError),
81    #[error("blob missing: {0}/{1}")]
82    BlobMissing(String, String),
83    #[error("blob resize failed: {0}/{1} error: {2}")]
84    BlobResizeFailed(String, String, IoError),
85    #[error("blob sync failed: {0}/{1} error: {2}")]
86    BlobSyncFailed(String, String, IoError),
87    #[error("blob close failed: {0}/{1} error: {2}")]
88    BlobCloseFailed(String, String, IoError),
89    #[error("blob insufficient length")]
90    BlobInsufficientLength,
91    #[error("offset overflow")]
92    OffsetOverflow,
93    #[error("io error: {0}")]
94    Io(#[from] IoError),
95}
96
97/// Interface that any task scheduler must implement to start
98/// running tasks.
99pub trait Runner {
100    /// Context defines the environment available to tasks.
101    /// Example of possible services provided by the context include:
102    /// - [Clock] for time-based operations
103    /// - [Network] for network operations
104    /// - [Storage] for storage operations
105    type Context;
106
107    /// Start running a root task.
108    fn start<F, Fut>(self, f: F) -> Fut::Output
109    where
110        F: FnOnce(Self::Context) -> Fut,
111        Fut: Future;
112}
113
114/// Interface that any task scheduler must implement to spawn tasks.
115pub trait Spawner: Clone + Send + Sync + 'static {
116    /// Enqueue a task to be executed.
117    ///
118    /// Unlike a future, a spawned task will start executing immediately (even if the caller
119    /// does not await the handle).
120    ///
121    /// Spawned tasks consume the context used to create them. This ensures that context cannot
122    /// be shared between tasks and that a task's context always comes from somewhere.
123    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
124    where
125        F: FnOnce(Self) -> Fut + Send + 'static,
126        Fut: Future<Output = T> + Send + 'static,
127        T: Send + 'static;
128
129    /// Enqueue a task to be executed (without consuming the context).
130    ///
131    /// The semantics are the same as [Spawner::spawn].
132    ///
133    /// # Warning
134    ///
135    /// If this function is used to spawn multiple tasks from the same context, the runtime will panic
136    /// to prevent accidental misuse.
137    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
138    where
139        F: Future<Output = T> + Send + 'static,
140        T: Send + 'static;
141
142    /// Enqueue a blocking task to be executed.
143    ///
144    /// This method is designed for synchronous, potentially long-running operations. Tasks can either
145    /// be executed in a shared thread (tasks that are expected to finish on their own) or a dedicated
146    /// thread (tasks that are expected to run indefinitely).
147    ///
148    /// The task starts executing immediately, and the returned handle can be awaited to retrieve the
149    /// result.
150    ///
151    /// # Motivation
152    ///
153    /// Most runtimes allocate a limited number of threads for executing async tasks, running whatever
154    /// isn't waiting. If blocking tasks are spawned this way, they can dramatically reduce the efficiency
155    /// of said runtimes.
156    ///
157    /// # Warning
158    ///
159    /// Blocking tasks cannot be aborted.
160    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
161    where
162        F: FnOnce(Self) -> T + Send + 'static,
163        T: Send + 'static;
164
165    /// Enqueue a blocking task to be executed (without consuming the context).
166    ///
167    /// The semantics are the same as [Spawner::spawn_blocking].
168    ///
169    /// # Warning
170    ///
171    /// If this function is used to spawn multiple tasks from the same context,
172    /// the runtime will panic to prevent accidental misuse.
173    fn spawn_blocking_ref<F, T>(
174        &mut self,
175        dedicated: bool,
176    ) -> impl FnOnce(F) -> Handle<T> + 'static
177    where
178        F: FnOnce() -> T + Send + 'static,
179        T: Send + 'static;
180
181    /// Signals the runtime to stop execution and that all outstanding tasks
182    /// should perform any required cleanup and exit. This method is idempotent and
183    /// can be called multiple times.
184    ///
185    /// This method does not actually kill any tasks but rather signals to them, using
186    /// the `Signal` returned by `stopped`, that they should exit.
187    fn stop(&self, value: i32);
188
189    /// Returns an instance of a `Signal` that resolves when `stop` is called by
190    /// any task.
191    ///
192    /// If `stop` has already been called, the returned `Signal` will resolve immediately.
193    fn stopped(&self) -> Signal;
194}
195
196/// Interface to register and encode metrics.
197pub trait Metrics: Clone + Send + Sync + 'static {
198    /// Get the current label of the context.
199    fn label(&self) -> String;
200
201    /// Create a new instance of `Metrics` with the given label appended to the end
202    /// of the current `Metrics` label.
203    ///
204    /// This is commonly used to create a nested context for `register`.
205    ///
206    /// It is not permitted for any implementation to use `METRICS_PREFIX` as the start of a
207    /// label (reserved for metrics for the runtime).
208    fn with_label(&self, label: &str) -> Self;
209
210    /// Prefix the given label with the current context's label.
211    ///
212    /// Unlike `with_label`, this method does not create a new context.
213    fn scoped_label(&self, label: &str) -> String {
214        let label = if self.label().is_empty() {
215            label.to_string()
216        } else {
217            format!("{}_{}", self.label(), label)
218        };
219        assert!(
220            !label.starts_with(METRICS_PREFIX),
221            "using runtime label is not allowed"
222        );
223        label
224    }
225
226    /// Register a metric with the runtime.
227    ///
228    /// Any registered metric will include (as a prefix) the label of the current context.
229    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
230
231    /// Encode all metrics into a buffer.
232    fn encode(&self) -> String;
233}
234
235/// Interface that any task scheduler must implement to provide
236/// time-based operations.
237///
238/// It is necessary to mock time to provide deterministic execution
239/// of arbitrary tasks.
240pub trait Clock: Clone + Send + Sync + 'static {
241    /// Returns the current time.
242    fn current(&self) -> SystemTime;
243
244    /// Sleep for the given duration.
245    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
246
247    /// Sleep until the given deadline.
248    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
249}
250
251/// Syntactic sugar for the type of [Sink] used by a given [Network] N.
252pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
253
254/// Syntactic sugar for the type of [Stream] used by a given [Network] N.
255pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
256
257/// Syntactic sugar for the type of [Listener] used by a given [Network] N.
258pub type ListenerOf<N> = <N as crate::Network>::Listener;
259
260/// Interface that any runtime must implement to create
261/// network connections.
262pub trait Network: Clone + Send + Sync + 'static {
263    /// The type of [Listener] that's returned when binding to a socket.
264    /// Accepting a connection returns a [Sink] and [Stream] which are defined
265    /// by the [Listener] and used to send and receive data over the connection.
266    type Listener: Listener;
267
268    /// Bind to the given socket address.
269    fn bind(
270        &self,
271        socket: SocketAddr,
272    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
273
274    /// Dial the given socket address.
275    fn dial(
276        &self,
277        socket: SocketAddr,
278    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
279}
280
281/// Interface that any runtime must implement to handle
282/// incoming network connections.
283pub trait Listener: Sync + Send + 'static {
284    /// The type of [Sink] that's returned when accepting a connection.
285    /// This is used to send data to the remote connection.
286    type Sink: Sink;
287    /// The type of [Stream] that's returned when accepting a connection.
288    /// This is used to receive data from the remote connection.
289    type Stream: Stream;
290
291    /// Accept an incoming connection.
292    fn accept(
293        &mut self,
294    ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
295
296    /// Returns the local address of the listener.
297    fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
298}
299
300/// Interface that any runtime must implement to send
301/// messages over a network connection.
302pub trait Sink: Sync + Send + 'static {
303    /// Send a message to the sink.
304    fn send(
305        &mut self,
306        msg: impl Into<StableBuf> + Send,
307    ) -> impl Future<Output = Result<(), Error>> + Send;
308}
309
310/// Interface that any runtime must implement to receive
311/// messages over a network connection.
312pub trait Stream: Sync + Send + 'static {
313    /// Receive a message from the stream, storing it in the given buffer.
314    /// Reads exactly the number of bytes that fit in the buffer.
315    fn recv(
316        &mut self,
317        buf: impl Into<StableBuf> + Send,
318    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
319}
320
321/// Interface to interact with storage.
322///
323///
324/// To support storage implementations that enable concurrent reads and
325/// writes, blobs are responsible for maintaining synchronization.
326///
327/// Storage can be backed by a local filesystem, cloud storage, etc.
328pub trait Storage: Clone + Send + Sync + 'static {
329    /// The readable/writeable storage buffer that can be opened by this Storage.
330    type Blob: Blob;
331
332    /// Open an existing blob in a given partition or create a new one, returning
333    /// the blob and its length.
334    ///
335    /// Multiple instances of the same blob can be opened concurrently, however,
336    /// writing to the same blob concurrently may lead to undefined behavior.
337    fn open(
338        &self,
339        partition: &str,
340        name: &[u8],
341    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
342
343    /// Remove a blob from a given partition.
344    ///
345    /// If no `name` is provided, the entire partition is removed.
346    fn remove(
347        &self,
348        partition: &str,
349        name: Option<&[u8]>,
350    ) -> impl Future<Output = Result<(), Error>> + Send;
351
352    /// Return all blobs in a given partition.
353    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
354}
355
356/// Interface to read and write to a blob.
357///
358/// To support blob implementations that enable concurrent reads and
359/// writes, blobs are responsible for maintaining synchronization.
360///
361/// Cloning a blob is similar to wrapping a single file descriptor in
362/// a lock whereas opening a new blob (of the same name) is similar to
363/// opening a new file descriptor. If multiple blobs are opened with the same
364/// name, they are not expected to coordinate access to underlying storage
365/// and writing to both is undefined behavior.
366#[allow(clippy::len_without_is_empty)]
367pub trait Blob: Clone + Send + Sync + 'static {
368    /// Read from the blob at the given offset.
369    ///
370    /// `read_at` does not return the number of bytes read because it
371    /// only returns once the entire buffer has been filled.
372    fn read_at(
373        &self,
374        buf: impl Into<StableBuf> + Send,
375        offset: u64,
376    ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
377
378    /// Write `buf` to the blob at the given offset.
379    fn write_at(
380        &self,
381        buf: impl Into<StableBuf> + Send,
382        offset: u64,
383    ) -> impl Future<Output = Result<(), Error>> + Send;
384
385    /// Resize the blob to the given length.
386    ///
387    /// If the length is greater than the current length, the blob is extended with zeros.
388    /// If the length is less than the current length, the blob is resized.
389    fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
390
391    /// Ensure all pending data is durably persisted.
392    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
393
394    /// Close the blob.
395    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use bytes::Bytes;
402    use commonware_macros::select;
403    use futures::{
404        channel::{mpsc, oneshot},
405        future::ready,
406        join, SinkExt, StreamExt,
407    };
408    use prometheus_client::metrics::counter::Counter;
409    use std::{
410        collections::HashMap,
411        panic::{catch_unwind, AssertUnwindSafe},
412        str::FromStr,
413        sync::Mutex,
414    };
415    use tracing::{error, Level};
416    use utils::reschedule;
417
418    fn test_error_future<R: Runner>(runner: R) {
419        async fn error_future() -> Result<&'static str, &'static str> {
420            Err("An error occurred")
421        }
422        let result = runner.start(|_| error_future());
423        assert_eq!(result, Err("An error occurred"));
424    }
425
426    fn test_clock_sleep<R: Runner>(runner: R)
427    where
428        R::Context: Spawner + Clock,
429    {
430        runner.start(|context| async move {
431            // Capture initial time
432            let start = context.current();
433            let sleep_duration = Duration::from_millis(10);
434            context.sleep(sleep_duration).await;
435
436            // After run, time should have advanced
437            let end = context.current();
438            assert!(end.duration_since(start).unwrap() >= sleep_duration);
439        });
440    }
441
442    fn test_clock_sleep_until<R: Runner>(runner: R)
443    where
444        R::Context: Spawner + Clock,
445    {
446        runner.start(|context| async move {
447            // Trigger sleep
448            let now = context.current();
449            context.sleep_until(now + Duration::from_millis(100)).await;
450
451            // Ensure slept duration has elapsed
452            let elapsed = now.elapsed().unwrap();
453            assert!(elapsed >= Duration::from_millis(100));
454        });
455    }
456
457    fn test_root_finishes<R: Runner>(runner: R)
458    where
459        R::Context: Spawner,
460    {
461        runner.start(|context| async move {
462            context.spawn(|_| async move {
463                loop {
464                    reschedule().await;
465                }
466            });
467        });
468    }
469
470    fn test_spawn_abort<R: Runner>(runner: R)
471    where
472        R::Context: Spawner,
473    {
474        runner.start(|context| async move {
475            let handle = context.spawn(|_| async move {
476                loop {
477                    reschedule().await;
478                }
479            });
480            handle.abort();
481            assert!(matches!(handle.await, Err(Error::Closed)));
482        });
483    }
484
485    fn test_panic_aborts_root<R: Runner>(runner: R) {
486        let result = catch_unwind(AssertUnwindSafe(|| {
487            runner.start(|_| async move {
488                panic!("blah");
489            });
490        }));
491        result.unwrap_err();
492    }
493
494    fn test_panic_aborts_spawn<R: Runner>(runner: R)
495    where
496        R::Context: Spawner,
497    {
498        let result = runner.start(|context| async move {
499            let result = context.spawn(|_| async move {
500                panic!("blah");
501            });
502            assert!(matches!(result.await, Err(Error::Exited)));
503            Result::<(), Error>::Ok(())
504        });
505
506        // Ensure panic was caught
507        result.unwrap();
508    }
509
510    fn test_select<R: Runner>(runner: R) {
511        runner.start(|_| async move {
512            // Test first branch
513            let output = Mutex::new(0);
514            select! {
515                v1 = ready(1) => {
516                    *output.lock().unwrap() = v1;
517                },
518                v2 = ready(2) => {
519                    *output.lock().unwrap() = v2;
520                },
521            };
522            assert_eq!(*output.lock().unwrap(), 1);
523
524            // Test second branch
525            select! {
526                v1 = std::future::pending::<i32>() => {
527                    *output.lock().unwrap() = v1;
528                },
529                v2 = ready(2) => {
530                    *output.lock().unwrap() = v2;
531                },
532            };
533            assert_eq!(*output.lock().unwrap(), 2);
534        });
535    }
536
537    /// Ensure future fusing works as expected.
538    fn test_select_loop<R: Runner>(runner: R)
539    where
540        R::Context: Clock,
541    {
542        runner.start(|context| async move {
543            // Should hit timeout
544            let (mut sender, mut receiver) = mpsc::unbounded();
545            for _ in 0..2 {
546                select! {
547                    v = receiver.next() => {
548                        panic!("unexpected value: {v:?}");
549                    },
550                    _ = context.sleep(Duration::from_millis(100)) => {
551                        continue;
552                    },
553                };
554            }
555
556            // Populate channel
557            sender.send(0).await.unwrap();
558            sender.send(1).await.unwrap();
559
560            // Prefer not reading channel without losing messages
561            select! {
562                _ = async {} => {
563                    // Skip reading from channel even though populated
564                },
565                v = receiver.next() => {
566                    panic!("unexpected value: {v:?}");
567                },
568            };
569
570            // Process messages
571            for i in 0..2 {
572                select! {
573                    _ = context.sleep(Duration::from_millis(100)) => {
574                        panic!("timeout");
575                    },
576                    v = receiver.next() => {
577                        assert_eq!(v.unwrap(), i);
578                    },
579                };
580            }
581        });
582    }
583
584    fn test_storage_operations<R: Runner>(runner: R)
585    where
586        R::Context: Storage,
587    {
588        runner.start(|context| async move {
589            let partition = "test_partition";
590            let name = b"test_blob";
591
592            // Open a new blob
593            let (blob, _) = context
594                .open(partition, name)
595                .await
596                .expect("Failed to open blob");
597
598            // Write data to the blob
599            let data = b"Hello, Storage!";
600            blob.write_at(Vec::from(data), 0)
601                .await
602                .expect("Failed to write to blob");
603
604            // Sync the blob
605            blob.sync().await.expect("Failed to sync blob");
606
607            // Read data from the blob
608            let read = blob
609                .read_at(vec![0; data.len()], 0)
610                .await
611                .expect("Failed to read from blob");
612            assert_eq!(read.as_ref(), data);
613
614            // Close the blob
615            blob.close().await.expect("Failed to close blob");
616
617            // Scan blobs in the partition
618            let blobs = context
619                .scan(partition)
620                .await
621                .expect("Failed to scan partition");
622            assert!(blobs.contains(&name.to_vec()));
623
624            // Reopen the blob
625            let (blob, len) = context
626                .open(partition, name)
627                .await
628                .expect("Failed to reopen blob");
629            assert_eq!(len, data.len() as u64);
630
631            // Read data part of message back
632            let read = blob
633                .read_at(vec![0u8; 7], 7)
634                .await
635                .expect("Failed to read data");
636            assert_eq!(read.as_ref(), b"Storage");
637
638            // Close the blob
639            blob.close().await.expect("Failed to close blob");
640
641            // Remove the blob
642            context
643                .remove(partition, Some(name))
644                .await
645                .expect("Failed to remove blob");
646
647            // Ensure the blob is removed
648            let blobs = context
649                .scan(partition)
650                .await
651                .expect("Failed to scan partition");
652            assert!(!blobs.contains(&name.to_vec()));
653
654            // Remove the partition
655            context
656                .remove(partition, None)
657                .await
658                .expect("Failed to remove partition");
659
660            // Scan the partition
661            let result = context.scan(partition).await;
662            assert!(matches!(result, Err(Error::PartitionMissing(_))));
663        });
664    }
665
666    fn test_blob_read_write<R: Runner>(runner: R)
667    where
668        R::Context: Storage,
669    {
670        runner.start(|context| async move {
671            let partition = "test_partition";
672            let name = b"test_blob_rw";
673
674            // Open a new blob
675            let (blob, _) = context
676                .open(partition, name)
677                .await
678                .expect("Failed to open blob");
679
680            // Write data at different offsets
681            let data1 = b"Hello";
682            let data2 = b"World";
683            blob.write_at(Vec::from(data1), 0)
684                .await
685                .expect("Failed to write data1");
686            blob.write_at(Vec::from(data2), 5)
687                .await
688                .expect("Failed to write data2");
689
690            // Read data back
691            let read = blob
692                .read_at(vec![0u8; 10], 0)
693                .await
694                .expect("Failed to read data");
695            assert_eq!(&read.as_ref()[..5], data1);
696            assert_eq!(&read.as_ref()[5..], data2);
697
698            // Read past end of blob
699            let result = blob.read_at(vec![0u8; 10], 10).await;
700            assert!(result.is_err());
701
702            // Rewrite data without affecting length
703            let data3 = b"Store";
704            blob.write_at(Vec::from(data3), 5)
705                .await
706                .expect("Failed to write data3");
707
708            // Read data back
709            let read = blob
710                .read_at(vec![0u8; 10], 0)
711                .await
712                .expect("Failed to read data");
713            assert_eq!(&read.as_ref()[..5], data1);
714            assert_eq!(&read.as_ref()[5..], data3);
715
716            // Read past end of blob
717            let result = blob.read_at(vec![0u8; 10], 10).await;
718            assert!(result.is_err());
719        });
720    }
721
722    fn test_blob_resize<R: Runner>(runner: R)
723    where
724        R::Context: Storage,
725    {
726        runner.start(|context| async move {
727            let partition = "test_partition_resize";
728            let name = b"test_blob_resize";
729
730            // Open and write to a new blob
731            let (blob, _) = context
732                .open(partition, name)
733                .await
734                .expect("Failed to open blob");
735
736            let data = b"some data";
737            blob.write_at(data.to_vec(), 0)
738                .await
739                .expect("Failed to write");
740            blob.sync().await.expect("Failed to sync after write");
741            blob.close()
742                .await
743                .expect("Failed to close blob after writing");
744
745            // Re-open and check length
746            let (blob, len) = context.open(partition, name).await.unwrap();
747            assert_eq!(len, data.len() as u64);
748
749            // Resize to extend the file
750            let new_len = (data.len() as u64) * 2;
751            blob.resize(new_len)
752                .await
753                .expect("Failed to resize to extend");
754            blob.sync().await.expect("Failed to sync after resize");
755            blob.close()
756                .await
757                .expect("Failed to close blob after resizing");
758
759            // Re-open and check length again
760            let (blob, len) = context.open(partition, name).await.unwrap();
761            assert_eq!(len, new_len);
762
763            // Read original data
764            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
765            assert_eq!(read_buf.as_ref(), data);
766
767            // Read extended part (should be zeros)
768            let extended_part = blob
769                .read_at(vec![0; data.len()], data.len() as u64)
770                .await
771                .unwrap();
772            assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
773
774            // Truncate the blob
775            blob.resize(data.len() as u64).await.unwrap();
776            blob.sync().await.unwrap();
777            blob.close().await.unwrap();
778
779            // Reopen to check truncation
780            let (blob, size) = context.open(partition, name).await.unwrap();
781            assert_eq!(size, data.len() as u64);
782
783            // Read truncated data
784            let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
785            assert_eq!(read_buf.as_ref(), data);
786            blob.close().await.unwrap();
787        });
788    }
789
790    fn test_many_partition_read_write<R: Runner>(runner: R)
791    where
792        R::Context: Storage,
793    {
794        runner.start(|context| async move {
795            let partitions = ["partition1", "partition2", "partition3"];
796            let name = b"test_blob_rw";
797            let data1 = b"Hello";
798            let data2 = b"World";
799
800            for (additional, partition) in partitions.iter().enumerate() {
801                // Open a new blob
802                let (blob, _) = context
803                    .open(partition, name)
804                    .await
805                    .expect("Failed to open blob");
806
807                // Write data at different offsets
808                blob.write_at(Vec::from(data1), 0)
809                    .await
810                    .expect("Failed to write data1");
811                blob.write_at(Vec::from(data2), 5 + additional as u64)
812                    .await
813                    .expect("Failed to write data2");
814
815                // Close the blob
816                blob.close().await.expect("Failed to close blob");
817            }
818
819            for (additional, partition) in partitions.iter().enumerate() {
820                // Open a new blob
821                let (blob, len) = context
822                    .open(partition, name)
823                    .await
824                    .expect("Failed to open blob");
825                assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
826
827                // Read data back
828                let read = blob
829                    .read_at(vec![0u8; 10 + additional], 0)
830                    .await
831                    .expect("Failed to read data");
832                assert_eq!(&read.as_ref()[..5], b"Hello");
833                assert_eq!(&read.as_ref()[5 + additional..], b"World");
834
835                // Close the blob
836                blob.close().await.expect("Failed to close blob");
837            }
838        });
839    }
840
841    fn test_blob_read_past_length<R: Runner>(runner: R)
842    where
843        R::Context: Storage,
844    {
845        runner.start(|context| async move {
846            let partition = "test_partition";
847            let name = b"test_blob_rw";
848
849            // Open a new blob
850            let (blob, _) = context
851                .open(partition, name)
852                .await
853                .expect("Failed to open blob");
854
855            // Read data past file length (empty file)
856            let result = blob.read_at(vec![0u8; 10], 0).await;
857            assert!(result.is_err());
858
859            // Write data to the blob
860            let data = b"Hello, Storage!".to_vec();
861            blob.write_at(data, 0)
862                .await
863                .expect("Failed to write to blob");
864
865            // Read data past file length (non-empty file)
866            let result = blob.read_at(vec![0u8; 20], 0).await;
867            assert!(result.is_err());
868        })
869    }
870
871    fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
872    where
873        R::Context: Spawner + Storage + Metrics,
874    {
875        runner.start(|context| async move {
876            let partition = "test_partition";
877            let name = b"test_blob_rw";
878
879            // Open a new blob
880            let (blob, _) = context
881                .open(partition, name)
882                .await
883                .expect("Failed to open blob");
884
885            // Write data to the blob
886            let data = b"Hello, Storage!";
887            blob.write_at(Vec::from(data), 0)
888                .await
889                .expect("Failed to write to blob");
890
891            // Sync the blob
892            blob.sync().await.expect("Failed to sync blob");
893
894            // Read data from the blob in clone
895            let check1 = context.with_label("check1").spawn({
896                let blob = blob.clone();
897                move |_| async move {
898                    let read = blob
899                        .read_at(vec![0u8; data.len()], 0)
900                        .await
901                        .expect("Failed to read from blob");
902                    assert_eq!(read.as_ref(), data);
903                }
904            });
905            let check2 = context.with_label("check2").spawn({
906                let blob = blob.clone();
907                move |_| async move {
908                    let read = blob
909                        .read_at(vec![0; data.len()], 0)
910                        .await
911                        .expect("Failed to read from blob");
912                    assert_eq!(read.as_ref(), data);
913                }
914            });
915
916            // Wait for both reads to complete
917            let result = join!(check1, check2);
918            assert!(result.0.is_ok());
919            assert!(result.1.is_ok());
920
921            // Read data from the blob
922            let read = blob
923                .read_at(vec![0; data.len()], 0)
924                .await
925                .expect("Failed to read from blob");
926            assert_eq!(read.as_ref(), data);
927
928            // Close the blob
929            blob.close().await.expect("Failed to close blob");
930
931            // Ensure no blobs still open
932            let buffer = context.encode();
933            assert!(buffer.contains("open_blobs 0"));
934        });
935    }
936
937    fn test_shutdown<R: Runner>(runner: R)
938    where
939        R::Context: Spawner + Metrics + Clock,
940    {
941        let kill = 9;
942        runner.start(|context| async move {
943            // Spawn a task that waits for signal
944            let before = context
945                .with_label("before")
946                .spawn(move |context| async move {
947                    let sig = context.stopped().await;
948                    assert_eq!(sig.unwrap(), kill);
949                });
950
951            // Spawn a task after stop is called
952            let after = context
953                .with_label("after")
954                .spawn(move |context| async move {
955                    // Wait for stop signal
956                    let mut signal = context.stopped();
957                    loop {
958                        select! {
959                            sig = &mut signal => {
960                                // Stopper resolved
961                                assert_eq!(sig.unwrap(), kill);
962                                break;
963                            },
964                            _ = context.sleep(Duration::from_millis(10)) => {
965                                // Continue waiting
966                            },
967                        }
968                    }
969                });
970
971            // Sleep for a bit before stopping
972            context.sleep(Duration::from_millis(50)).await;
973
974            // Signal the task
975            context.stop(kill);
976
977            // Ensure both tasks complete
978            let result = join!(before, after);
979            assert!(result.0.is_ok());
980            assert!(result.1.is_ok());
981        });
982    }
983
984    fn test_spawn_ref<R: Runner>(runner: R)
985    where
986        R::Context: Spawner,
987    {
988        runner.start(|mut context| async move {
989            let handle = context.spawn_ref();
990            let result = handle(async move { 42 }).await;
991            assert!(matches!(result, Ok(42)));
992        });
993    }
994
995    fn test_spawn_ref_duplicate<R: Runner>(runner: R)
996    where
997        R::Context: Spawner,
998    {
999        runner.start(|mut context| async move {
1000            let handle = context.spawn_ref();
1001            let result = handle(async move { 42 }).await;
1002            assert!(matches!(result, Ok(42)));
1003
1004            // Ensure context is consumed
1005            let handle = context.spawn_ref();
1006            let result = handle(async move { 42 }).await;
1007            assert!(matches!(result, Ok(42)));
1008        });
1009    }
1010
1011    fn test_spawn_duplicate<R: Runner>(runner: R)
1012    where
1013        R::Context: Spawner,
1014    {
1015        runner.start(|mut context| async move {
1016            let handle = context.spawn_ref();
1017            let result = handle(async move { 42 }).await;
1018            assert!(matches!(result, Ok(42)));
1019
1020            // Ensure context is consumed
1021            context.spawn(|_| async move { 42 });
1022        });
1023    }
1024
1025    fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1026    where
1027        R::Context: Spawner,
1028    {
1029        runner.start(|context| async move {
1030            let handle = context.spawn_blocking(dedicated, |_| 42);
1031            let result = handle.await;
1032            assert!(matches!(result, Ok(42)));
1033        });
1034    }
1035
1036    fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1037    where
1038        R::Context: Spawner,
1039    {
1040        runner.start(|mut context| async move {
1041            let spawn = context.spawn_blocking_ref(dedicated);
1042            let handle = spawn(|| 42);
1043            let result = handle.await;
1044            assert!(matches!(result, Ok(42)));
1045        });
1046    }
1047
1048    fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1049    where
1050        R::Context: Spawner,
1051    {
1052        runner.start(|mut context| async move {
1053            let spawn = context.spawn_blocking_ref(dedicated);
1054            let result = spawn(|| 42).await;
1055            assert!(matches!(result, Ok(42)));
1056
1057            // Ensure context is consumed
1058            context.spawn_blocking(dedicated, |_| 42);
1059        });
1060    }
1061
1062    fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1063    where
1064        R::Context: Spawner,
1065    {
1066        runner.start(|context| async move {
1067            // Create task
1068            let (sender, mut receiver) = oneshot::channel();
1069            let handle = context.spawn_blocking(dedicated, move |_| {
1070                // Wait for abort to be called
1071                loop {
1072                    if receiver.try_recv().is_ok() {
1073                        break;
1074                    }
1075                }
1076
1077                // Perform a long-running operation
1078                let mut count = 0;
1079                loop {
1080                    count += 1;
1081                    if count >= 100_000_000 {
1082                        break;
1083                    }
1084                }
1085                count
1086            });
1087
1088            // Abort the task
1089            //
1090            // If there was an `.await` prior to sending a message over the oneshot, this test
1091            // could deadlock (depending on the runtime implementation) because the blocking task
1092            // would never yield (preventing send from being called).
1093            handle.abort();
1094            sender.send(()).unwrap();
1095
1096            // Wait for the task to complete
1097            assert!(matches!(handle.await, Ok(100_000_000)));
1098        });
1099    }
1100
1101    fn test_metrics<R: Runner>(runner: R)
1102    where
1103        R::Context: Metrics,
1104    {
1105        runner.start(|context| async move {
1106            // Assert label
1107            assert_eq!(context.label(), "");
1108
1109            // Register a metric
1110            let counter = Counter::<u64>::default();
1111            context.register("test", "test", counter.clone());
1112
1113            // Increment the counter
1114            counter.inc();
1115
1116            // Encode metrics
1117            let buffer = context.encode();
1118            assert!(buffer.contains("test_total 1"));
1119
1120            // Nested context
1121            let context = context.with_label("nested");
1122            let nested_counter = Counter::<u64>::default();
1123            context.register("test", "test", nested_counter.clone());
1124
1125            // Increment the counter
1126            nested_counter.inc();
1127
1128            // Encode metrics
1129            let buffer = context.encode();
1130            assert!(buffer.contains("nested_test_total 1"));
1131            assert!(buffer.contains("test_total 1"));
1132        });
1133    }
1134
1135    fn test_metrics_label<R: Runner>(runner: R)
1136    where
1137        R::Context: Metrics,
1138    {
1139        runner.start(|context| async move {
1140            context.with_label(METRICS_PREFIX);
1141        })
1142    }
1143
1144    #[test]
1145    fn test_deterministic_future() {
1146        let runner = deterministic::Runner::default();
1147        test_error_future(runner);
1148    }
1149
1150    #[test]
1151    fn test_deterministic_clock_sleep() {
1152        let executor = deterministic::Runner::default();
1153        test_clock_sleep(executor);
1154    }
1155
1156    #[test]
1157    fn test_deterministic_clock_sleep_until() {
1158        let executor = deterministic::Runner::default();
1159        test_clock_sleep_until(executor);
1160    }
1161
1162    #[test]
1163    fn test_deterministic_root_finishes() {
1164        let executor = deterministic::Runner::default();
1165        test_root_finishes(executor);
1166    }
1167
1168    #[test]
1169    fn test_deterministic_spawn_abort() {
1170        let executor = deterministic::Runner::default();
1171        test_spawn_abort(executor);
1172    }
1173
1174    #[test]
1175    fn test_deterministic_panic_aborts_root() {
1176        let runner = deterministic::Runner::default();
1177        test_panic_aborts_root(runner);
1178    }
1179
1180    #[test]
1181    #[should_panic(expected = "blah")]
1182    fn test_deterministic_panic_aborts_spawn() {
1183        let executor = deterministic::Runner::default();
1184        test_panic_aborts_spawn(executor);
1185    }
1186
1187    #[test]
1188    fn test_deterministic_select() {
1189        let executor = deterministic::Runner::default();
1190        test_select(executor);
1191    }
1192
1193    #[test]
1194    fn test_deterministic_select_loop() {
1195        let executor = deterministic::Runner::default();
1196        test_select_loop(executor);
1197    }
1198
1199    #[test]
1200    fn test_deterministic_storage_operations() {
1201        let executor = deterministic::Runner::default();
1202        test_storage_operations(executor);
1203    }
1204
1205    #[test]
1206    fn test_deterministic_blob_read_write() {
1207        let executor = deterministic::Runner::default();
1208        test_blob_read_write(executor);
1209    }
1210
1211    #[test]
1212    fn test_deterministic_blob_resize() {
1213        let executor = deterministic::Runner::default();
1214        test_blob_resize(executor);
1215    }
1216
1217    #[test]
1218    fn test_deterministic_many_partition_read_write() {
1219        let executor = deterministic::Runner::default();
1220        test_many_partition_read_write(executor);
1221    }
1222
1223    #[test]
1224    fn test_deterministic_blob_read_past_length() {
1225        let executor = deterministic::Runner::default();
1226        test_blob_read_past_length(executor);
1227    }
1228
1229    #[test]
1230    fn test_deterministic_blob_clone_and_concurrent_read() {
1231        // Run test
1232        let executor = deterministic::Runner::default();
1233        test_blob_clone_and_concurrent_read(executor);
1234    }
1235
1236    #[test]
1237    fn test_deterministic_shutdown() {
1238        let executor = deterministic::Runner::default();
1239        test_shutdown(executor);
1240    }
1241
1242    #[test]
1243    fn test_deterministic_spawn_ref() {
1244        let executor = deterministic::Runner::default();
1245        test_spawn_ref(executor);
1246    }
1247
1248    #[test]
1249    #[should_panic]
1250    fn test_deterministic_spawn_ref_duplicate() {
1251        let executor = deterministic::Runner::default();
1252        test_spawn_ref_duplicate(executor);
1253    }
1254
1255    #[test]
1256    #[should_panic]
1257    fn test_deterministic_spawn_duplicate() {
1258        let executor = deterministic::Runner::default();
1259        test_spawn_duplicate(executor);
1260    }
1261
1262    #[test]
1263    fn test_deterministic_spawn_blocking() {
1264        for dedicated in [false, true] {
1265            let executor = deterministic::Runner::default();
1266            test_spawn_blocking(executor, dedicated);
1267        }
1268    }
1269
1270    #[test]
1271    #[should_panic(expected = "blocking task panicked")]
1272    fn test_deterministic_spawn_blocking_panic() {
1273        for dedicated in [false, true] {
1274            let executor = deterministic::Runner::default();
1275            executor.start(|context| async move {
1276                let handle = context.spawn_blocking(dedicated, |_| {
1277                    panic!("blocking task panicked");
1278                });
1279                handle.await.unwrap();
1280            });
1281        }
1282    }
1283
1284    #[test]
1285    fn test_deterministic_spawn_blocking_abort() {
1286        for dedicated in [false, true] {
1287            let executor = deterministic::Runner::default();
1288            test_spawn_blocking_abort(executor, dedicated);
1289        }
1290    }
1291
1292    #[test]
1293    fn test_deterministic_spawn_blocking_ref() {
1294        for dedicated in [false, true] {
1295            let executor = deterministic::Runner::default();
1296            test_spawn_blocking_ref(executor, dedicated);
1297        }
1298    }
1299
1300    #[test]
1301    #[should_panic]
1302    fn test_deterministic_spawn_blocking_ref_duplicate() {
1303        for dedicated in [false, true] {
1304            let executor = deterministic::Runner::default();
1305            test_spawn_blocking_ref_duplicate(executor, dedicated);
1306        }
1307    }
1308
1309    #[test]
1310    fn test_deterministic_metrics() {
1311        let executor = deterministic::Runner::default();
1312        test_metrics(executor);
1313    }
1314
1315    #[test]
1316    #[should_panic]
1317    fn test_deterministic_metrics_label() {
1318        let executor = deterministic::Runner::default();
1319        test_metrics_label(executor);
1320    }
1321
1322    #[test]
1323    fn test_tokio_error_future() {
1324        let runner = tokio::Runner::default();
1325        test_error_future(runner);
1326    }
1327
1328    #[test]
1329    fn test_tokio_clock_sleep() {
1330        let executor = tokio::Runner::default();
1331        test_clock_sleep(executor);
1332    }
1333
1334    #[test]
1335    fn test_tokio_clock_sleep_until() {
1336        let executor = tokio::Runner::default();
1337        test_clock_sleep_until(executor);
1338    }
1339
1340    #[test]
1341    fn test_tokio_root_finishes() {
1342        let executor = tokio::Runner::default();
1343        test_root_finishes(executor);
1344    }
1345
1346    #[test]
1347    fn test_tokio_spawn_abort() {
1348        let executor = tokio::Runner::default();
1349        test_spawn_abort(executor);
1350    }
1351
1352    #[test]
1353    fn test_tokio_panic_aborts_root() {
1354        let executor = tokio::Runner::default();
1355        test_panic_aborts_root(executor);
1356    }
1357
1358    #[test]
1359    fn test_tokio_panic_aborts_spawn() {
1360        let executor = tokio::Runner::default();
1361        test_panic_aborts_spawn(executor);
1362    }
1363
1364    #[test]
1365    fn test_tokio_select() {
1366        let executor = tokio::Runner::default();
1367        test_select(executor);
1368    }
1369
1370    #[test]
1371    fn test_tokio_select_loop() {
1372        let executor = tokio::Runner::default();
1373        test_select_loop(executor);
1374    }
1375
1376    #[test]
1377    fn test_tokio_storage_operations() {
1378        let executor = tokio::Runner::default();
1379        test_storage_operations(executor);
1380    }
1381
1382    #[test]
1383    fn test_tokio_blob_read_write() {
1384        let executor = tokio::Runner::default();
1385        test_blob_read_write(executor);
1386    }
1387
1388    #[test]
1389    fn test_tokio_blob_resize() {
1390        let executor = tokio::Runner::default();
1391        test_blob_resize(executor);
1392    }
1393
1394    #[test]
1395    fn test_tokio_many_partition_read_write() {
1396        let executor = tokio::Runner::default();
1397        test_many_partition_read_write(executor);
1398    }
1399
1400    #[test]
1401    fn test_tokio_blob_read_past_length() {
1402        let executor = tokio::Runner::default();
1403        test_blob_read_past_length(executor);
1404    }
1405
1406    #[test]
1407    fn test_tokio_blob_clone_and_concurrent_read() {
1408        // Run test
1409        let executor = tokio::Runner::default();
1410        test_blob_clone_and_concurrent_read(executor);
1411    }
1412
1413    #[test]
1414    fn test_tokio_shutdown() {
1415        let executor = tokio::Runner::default();
1416        test_shutdown(executor);
1417    }
1418
1419    #[test]
1420    fn test_tokio_spawn_ref() {
1421        let executor = tokio::Runner::default();
1422        test_spawn_ref(executor);
1423    }
1424
1425    #[test]
1426    #[should_panic]
1427    fn test_tokio_spawn_ref_duplicate() {
1428        let executor = tokio::Runner::default();
1429        test_spawn_ref_duplicate(executor);
1430    }
1431
1432    #[test]
1433    #[should_panic]
1434    fn test_tokio_spawn_duplicate() {
1435        let executor = tokio::Runner::default();
1436        test_spawn_duplicate(executor);
1437    }
1438
1439    #[test]
1440    fn test_tokio_spawn_blocking() {
1441        for dedicated in [false, true] {
1442            let executor = tokio::Runner::default();
1443            test_spawn_blocking(executor, dedicated);
1444        }
1445    }
1446
1447    #[test]
1448    fn test_tokio_spawn_blocking_panic() {
1449        for dedicated in [false, true] {
1450            let executor = tokio::Runner::default();
1451            executor.start(|context| async move {
1452                let handle = context.spawn_blocking(dedicated, |_| {
1453                    panic!("blocking task panicked");
1454                });
1455                let result = handle.await;
1456                assert!(matches!(result, Err(Error::Exited)));
1457            });
1458        }
1459    }
1460
1461    #[test]
1462    fn test_tokio_spawn_blocking_abort() {
1463        for dedicated in [false, true] {
1464            let executor = tokio::Runner::default();
1465            test_spawn_blocking_abort(executor, dedicated);
1466        }
1467    }
1468
1469    #[test]
1470    fn test_tokio_spawn_blocking_ref() {
1471        for dedicated in [false, true] {
1472            let executor = tokio::Runner::default();
1473            test_spawn_blocking_ref(executor, dedicated);
1474        }
1475    }
1476
1477    #[test]
1478    #[should_panic]
1479    fn test_tokio_spawn_blocking_ref_duplicate() {
1480        for dedicated in [false, true] {
1481            let executor = tokio::Runner::default();
1482            test_spawn_blocking_ref_duplicate(executor, dedicated);
1483        }
1484    }
1485
1486    #[test]
1487    fn test_tokio_metrics() {
1488        let executor = tokio::Runner::default();
1489        test_metrics(executor);
1490    }
1491
1492    #[test]
1493    #[should_panic]
1494    fn test_tokio_metrics_label() {
1495        let executor = tokio::Runner::default();
1496        test_metrics_label(executor);
1497    }
1498
1499    #[test]
1500    fn test_tokio_telemetry() {
1501        let executor = tokio::Runner::default();
1502        executor.start(|context| async move {
1503            // Define the server address
1504            let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1505
1506            // Configure telemetry
1507            tokio::telemetry::init(
1508                context.with_label("metrics"),
1509                tokio::telemetry::Logging {
1510                    level: Level::INFO,
1511                    json: false,
1512                },
1513                Some(address),
1514                None,
1515            );
1516
1517            // Register a test metric
1518            let counter: Counter<u64> = Counter::default();
1519            context.register("test_counter", "Test counter", counter.clone());
1520            counter.inc();
1521
1522            // Helper functions to parse HTTP response
1523            async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1524                let mut line = Vec::new();
1525                loop {
1526                    let byte = stream.recv(vec![0; 1]).await?;
1527                    if byte[0] == b'\n' {
1528                        if line.last() == Some(&b'\r') {
1529                            line.pop(); // Remove trailing \r
1530                        }
1531                        break;
1532                    }
1533                    line.push(byte[0]);
1534                }
1535                String::from_utf8(line).map_err(|_| Error::ReadFailed)
1536            }
1537
1538            async fn read_headers<St: Stream>(
1539                stream: &mut St,
1540            ) -> Result<HashMap<String, String>, Error> {
1541                let mut headers = HashMap::new();
1542                loop {
1543                    let line = read_line(stream).await?;
1544                    if line.is_empty() {
1545                        break;
1546                    }
1547                    let parts: Vec<&str> = line.splitn(2, ": ").collect();
1548                    if parts.len() == 2 {
1549                        headers.insert(parts[0].to_string(), parts[1].to_string());
1550                    }
1551                }
1552                Ok(headers)
1553            }
1554
1555            async fn read_body<St: Stream>(
1556                stream: &mut St,
1557                content_length: usize,
1558            ) -> Result<String, Error> {
1559                let read = stream.recv(vec![0; content_length]).await?;
1560                String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1561            }
1562
1563            // Simulate a client connecting to the server
1564            let client_handle = context
1565                .with_label("client")
1566                .spawn(move |context| async move {
1567                    let (mut sink, mut stream) = loop {
1568                        match context.dial(address).await {
1569                            Ok((sink, stream)) => break (sink, stream),
1570                            Err(e) => {
1571                                // The client may be polled before the server is ready, that's alright!
1572                                error!(err =?e, "failed to connect");
1573                                context.sleep(Duration::from_millis(10)).await;
1574                            }
1575                        }
1576                    };
1577
1578                    // Send a GET request to the server
1579                    let request = format!(
1580                        "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1581                    );
1582                    sink.send(Bytes::from(request).to_vec()).await.unwrap();
1583
1584                    // Read and verify the HTTP status line
1585                    let status_line = read_line(&mut stream).await.unwrap();
1586                    assert_eq!(status_line, "HTTP/1.1 200 OK");
1587
1588                    // Read and parse headers
1589                    let headers = read_headers(&mut stream).await.unwrap();
1590                    println!("Headers: {headers:?}");
1591                    let content_length = headers
1592                        .get("content-length")
1593                        .unwrap()
1594                        .parse::<usize>()
1595                        .unwrap();
1596
1597                    // Read and verify the body
1598                    let body = read_body(&mut stream, content_length).await.unwrap();
1599                    assert!(body.contains("test_counter_total 1"));
1600                });
1601
1602            // Wait for the client task to complete
1603            client_handle.await.unwrap();
1604        });
1605    }
1606}