1#![doc(
21 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22 html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use bytes::{Buf, BufMut};
26use commonware_macros::select;
27use commonware_parallel::{Rayon, ThreadPool};
28use commonware_utils::StableBuf;
29use prometheus_client::registry::Metric;
30use rayon::ThreadPoolBuildError;
31use std::{
32 future::Future,
33 io::Error as IoError,
34 net::SocketAddr,
35 num::NonZeroUsize,
36 time::{Duration, SystemTime},
37};
38use thiserror::Error;
39
40#[macro_use]
41mod macros;
42
43pub mod deterministic;
44pub mod mocks;
45cfg_if::cfg_if! {
46 if #[cfg(not(target_arch = "wasm32"))] {
47 pub mod tokio;
48 pub mod benchmarks;
49 }
50}
51mod network;
52mod process;
53mod storage;
54pub mod telemetry;
55pub mod utils;
56pub use utils::*;
57#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
58mod iouring;
59
60const METRICS_PREFIX: &str = "runtime";
62
63pub const DEFAULT_BLOB_VERSION: u16 = 0;
65
66#[derive(Error, Debug)]
68pub enum Error {
69 #[error("exited")]
70 Exited,
71 #[error("closed")]
72 Closed,
73 #[error("timeout")]
74 Timeout,
75 #[error("bind failed")]
76 BindFailed,
77 #[error("connection failed")]
78 ConnectionFailed,
79 #[error("write failed")]
80 WriteFailed,
81 #[error("read failed")]
82 ReadFailed,
83 #[error("send failed")]
84 SendFailed,
85 #[error("recv failed")]
86 RecvFailed,
87 #[error("dns resolution failed: {0}")]
88 ResolveFailed(String),
89 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
90 PartitionNameInvalid(String),
91 #[error("partition creation failed: {0}")]
92 PartitionCreationFailed(String),
93 #[error("partition missing: {0}")]
94 PartitionMissing(String),
95 #[error("partition corrupt: {0}")]
96 PartitionCorrupt(String),
97 #[error("blob open failed: {0}/{1} error: {2}")]
98 BlobOpenFailed(String, String, IoError),
99 #[error("blob missing: {0}/{1}")]
100 BlobMissing(String, String),
101 #[error("blob resize failed: {0}/{1} error: {2}")]
102 BlobResizeFailed(String, String, IoError),
103 #[error("blob sync failed: {0}/{1} error: {2}")]
104 BlobSyncFailed(String, String, IoError),
105 #[error("blob insufficient length")]
106 BlobInsufficientLength,
107 #[error("blob corrupt: {0}/{1} reason: {2}")]
108 BlobCorrupt(String, String, String),
109 #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
110 BlobVersionMismatch {
111 expected: std::ops::RangeInclusive<u16>,
112 found: u16,
113 },
114 #[error("invalid or missing checksum")]
115 InvalidChecksum,
116 #[error("offset overflow")]
117 OffsetOverflow,
118 #[error("immutable blob")]
119 ImmutableBlob,
120 #[error("io error: {0}")]
121 Io(#[from] IoError),
122}
123
124pub trait Runner {
127 type Context;
133
134 fn start<F, Fut>(self, f: F) -> Fut::Output
140 where
141 F: FnOnce(Self::Context) -> Fut,
142 Fut: Future;
143}
144
145pub trait Spawner: Clone + Send + Sync + 'static {
147 fn shared(self, blocking: bool) -> Self;
156
157 fn dedicated(self) -> Self;
164
165 fn instrumented(self) -> Self;
167
168 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
202 where
203 F: FnOnce(Self) -> Fut + Send + 'static,
204 Fut: Future<Output = T> + Send + 'static,
205 T: Send + 'static;
206
207 fn stop(
227 self,
228 value: i32,
229 timeout: Option<Duration>,
230 ) -> impl Future<Output = Result<(), Error>> + Send;
231
232 fn stopped(&self) -> signal::Signal;
239}
240
241pub trait RayonPoolSpawner: Spawner + Metrics {
244 fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError>;
253
254 fn create_strategy(&self, concurrency: NonZeroUsize) -> Result<Rayon, ThreadPoolBuildError> {
263 self.create_pool(concurrency).map(Rayon::with_pool)
264 }
265}
266
267pub trait Metrics: Clone + Send + Sync + 'static {
269 fn label(&self) -> String;
271
272 fn with_label(&self, label: &str) -> Self;
280
281 fn scoped_label(&self, label: &str) -> String {
285 let label = if self.label().is_empty() {
286 label.to_string()
287 } else {
288 format!("{}_{}", self.label(), label)
289 };
290 assert!(
291 !label.starts_with(METRICS_PREFIX),
292 "using runtime label is not allowed"
293 );
294 label
295 }
296
297 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
301
302 fn encode(&self) -> String;
304}
305
306pub use governor::Quota;
308
309pub type RateLimiter<C> = governor::RateLimiter<
314 governor::state::NotKeyed,
315 governor::state::InMemoryState,
316 C,
317 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
318>;
319
320pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
327 K,
328 governor::state::keyed::HashMapStateStore<K>,
329 C,
330 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
331>;
332
333pub trait Clock:
339 governor::clock::Clock<Instant = SystemTime>
340 + governor::clock::ReasonablyRealtime
341 + Clone
342 + Send
343 + Sync
344 + 'static
345{
346 fn current(&self) -> SystemTime;
348
349 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
351
352 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
354
355 fn timeout<F, T>(
376 &self,
377 duration: Duration,
378 future: F,
379 ) -> impl Future<Output = Result<T, Error>> + Send + '_
380 where
381 F: Future<Output = T> + Send + 'static,
382 T: Send + 'static,
383 {
384 async move {
385 select! {
386 result = future => {
387 Ok(result)
388 },
389 _ = self.sleep(duration) => {
390 Err(Error::Timeout)
391 },
392 }
393 }
394 }
395}
396
397cfg_if::cfg_if! {
398 if #[cfg(feature = "external")] {
399 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
401 fn pace<'a, F, T>(
421 &'a self,
422 latency: Duration,
423 future: F,
424 ) -> impl Future<Output = T> + Send + 'a
425 where
426 F: Future<Output = T> + Send + 'a,
427 T: Send + 'a;
428 }
429
430 pub trait FutureExt: Future + Send + Sized {
435 fn pace<'a, E>(
437 self,
438 pacer: &'a E,
439 latency: Duration,
440 ) -> impl Future<Output = Self::Output> + Send + 'a
441 where
442 E: Pacer + 'a,
443 Self: Send + 'a,
444 Self::Output: Send + 'a,
445 {
446 pacer.pace(latency, self)
447 }
448 }
449
450 impl<F> FutureExt for F where F: Future + Send {}
451 }
452}
453
454pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
456
457pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
459
460pub type ListenerOf<N> = <N as crate::Network>::Listener;
462
463pub trait Network: Clone + Send + Sync + 'static {
466 type Listener: Listener;
470
471 fn bind(
473 &self,
474 socket: SocketAddr,
475 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
476
477 fn dial(
479 &self,
480 socket: SocketAddr,
481 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
482}
483
484pub trait Resolver: Clone + Send + Sync + 'static {
486 fn resolve(
490 &self,
491 host: &str,
492 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
493}
494
495pub trait Listener: Sync + Send + 'static {
498 type Sink: Sink;
501 type Stream: Stream;
504
505 fn accept(
507 &mut self,
508 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
509
510 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
512}
513
514pub trait Sink: Sync + Send + 'static {
517 fn send(&mut self, msg: impl Buf + Send) -> impl Future<Output = Result<(), Error>> + Send;
523}
524
525pub trait Stream: Sync + Send + 'static {
528 fn recv(&mut self, buf: impl BufMut + Send) -> impl Future<Output = Result<(), Error>> + Send;
535}
536
537pub trait Storage: Clone + Send + Sync + 'static {
550 type Blob: Blob;
552
553 fn open(
556 &self,
557 partition: &str,
558 name: &[u8],
559 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
560 async move {
561 let (blob, size, _) = self
562 .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
563 .await?;
564 Ok((blob, size))
565 }
566 }
567
568 fn open_versioned(
585 &self,
586 partition: &str,
587 name: &[u8],
588 versions: std::ops::RangeInclusive<u16>,
589 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
590
591 fn remove(
597 &self,
598 partition: &str,
599 name: Option<&[u8]>,
600 ) -> impl Future<Output = Result<(), Error>> + Send;
601
602 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
604}
605
606#[allow(clippy::len_without_is_empty)]
621pub trait Blob: Clone + Send + Sync + 'static {
622 fn read_at(
627 &self,
628 buf: impl Into<StableBuf> + Send,
629 offset: u64,
630 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
631
632 fn write_at(
634 &self,
635 buf: impl Into<StableBuf> + Send,
636 offset: u64,
637 ) -> impl Future<Output = Result<(), Error>> + Send;
638
639 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
644
645 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652 use crate::telemetry::traces::collector::TraceStorage;
653 use bytes::Bytes;
654 use commonware_macros::{select, test_collect_traces};
655 use commonware_utils::NZUsize;
656 use futures::{
657 channel::{mpsc, oneshot},
658 future::{pending, ready},
659 join, pin_mut, FutureExt, SinkExt, StreamExt,
660 };
661 use prometheus_client::metrics::counter::Counter;
662 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
663 use std::{
664 collections::HashMap,
665 net::{IpAddr, Ipv4Addr, Ipv6Addr},
666 pin::Pin,
667 str::FromStr,
668 sync::{
669 atomic::{AtomicU32, Ordering},
670 Arc, Mutex,
671 },
672 task::{Context as TContext, Poll, Waker},
673 };
674 use tracing::{error, Level};
675 use utils::reschedule;
676
677 fn test_error_future<R: Runner>(runner: R) {
678 async fn error_future() -> Result<&'static str, &'static str> {
679 Err("An error occurred")
680 }
681 let result = runner.start(|_| error_future());
682 assert_eq!(result, Err("An error occurred"));
683 }
684
685 fn test_clock_sleep<R: Runner>(runner: R)
686 where
687 R::Context: Spawner + Clock,
688 {
689 runner.start(|context| async move {
690 let start = context.current();
692 let sleep_duration = Duration::from_millis(10);
693 context.sleep(sleep_duration).await;
694
695 let end = context.current();
697 assert!(end.duration_since(start).unwrap() >= sleep_duration);
698 });
699 }
700
701 fn test_clock_sleep_until<R: Runner>(runner: R)
702 where
703 R::Context: Spawner + Clock + Metrics,
704 {
705 runner.start(|context| async move {
706 let now = context.current();
708 context.sleep_until(now + Duration::from_millis(100)).await;
709
710 let elapsed = now.elapsed().unwrap();
712 assert!(elapsed >= Duration::from_millis(100));
713 });
714 }
715
716 fn test_clock_timeout<R: Runner>(runner: R)
717 where
718 R::Context: Spawner + Clock,
719 {
720 runner.start(|context| async move {
721 let result = context
723 .timeout(Duration::from_millis(100), async { "success" })
724 .await;
725 assert_eq!(result.unwrap(), "success");
726
727 let result = context
729 .timeout(Duration::from_millis(50), pending::<()>())
730 .await;
731 assert!(matches!(result, Err(Error::Timeout)));
732
733 let result = context
735 .timeout(
736 Duration::from_millis(100),
737 context.sleep(Duration::from_millis(50)),
738 )
739 .await;
740 assert!(result.is_ok());
741 });
742 }
743
744 fn test_root_finishes<R: Runner>(runner: R)
745 where
746 R::Context: Spawner,
747 {
748 runner.start(|context| async move {
749 context.spawn(|_| async move {
750 loop {
751 reschedule().await;
752 }
753 });
754 });
755 }
756
757 fn test_spawn_after_abort<R>(runner: R)
758 where
759 R: Runner,
760 R::Context: Spawner + Clone,
761 {
762 runner.start(|context| async move {
763 let child = context.clone();
765
766 let parent_handle = context.spawn(move |_| async move {
768 pending::<()>().await;
769 });
770 parent_handle.abort();
771
772 let child_handle = child.spawn(move |_| async move {
774 pending::<()>().await;
775 });
776 assert!(matches!(child_handle.await, Err(Error::Closed)));
777 });
778 }
779
780 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
781 where
782 R::Context: Spawner,
783 {
784 runner.start(|context| async move {
785 let context = if dedicated {
786 assert!(!blocking);
787 context.dedicated()
788 } else {
789 context.shared(blocking)
790 };
791
792 let handle = context.spawn(|_| async move {
793 loop {
794 reschedule().await;
795 }
796 });
797 handle.abort();
798 assert!(matches!(handle.await, Err(Error::Closed)));
799 });
800 }
801
802 fn test_panic_aborts_root<R: Runner>(runner: R) {
803 let result: Result<(), Error> = runner.start(|_| async move {
804 panic!("blah");
805 });
806 result.unwrap_err();
807 }
808
809 fn test_panic_aborts_spawn<R: Runner>(runner: R)
810 where
811 R::Context: Spawner + Clock,
812 {
813 runner.start(|context| async move {
814 context.clone().spawn(|_| async move {
815 panic!("blah");
816 });
817
818 loop {
820 context.sleep(Duration::from_millis(100)).await;
821 }
822 });
823 }
824
825 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
826 where
827 R::Context: Spawner + Clock,
828 {
829 let result: Result<(), Error> = runner.start(|context| async move {
830 let result = context.clone().spawn(|_| async move {
831 panic!("blah");
832 });
833 result.await
834 });
835 assert!(matches!(result, Err(Error::Exited)));
836 }
837
838 fn test_multiple_panics<R: Runner>(runner: R)
839 where
840 R::Context: Spawner + Clock,
841 {
842 runner.start(|context| async move {
843 context.clone().spawn(|_| async move {
844 panic!("boom 1");
845 });
846 context.clone().spawn(|_| async move {
847 panic!("boom 2");
848 });
849 context.clone().spawn(|_| async move {
850 panic!("boom 3");
851 });
852
853 loop {
855 context.sleep(Duration::from_millis(100)).await;
856 }
857 });
858 }
859
860 fn test_multiple_panics_caught<R: Runner>(runner: R)
861 where
862 R::Context: Spawner + Clock,
863 {
864 let (res1, res2, res3) = runner.start(|context| async move {
865 let handle1 = context.clone().spawn(|_| async move {
866 panic!("boom 1");
867 });
868 let handle2 = context.clone().spawn(|_| async move {
869 panic!("boom 2");
870 });
871 let handle3 = context.clone().spawn(|_| async move {
872 panic!("boom 3");
873 });
874
875 join!(handle1, handle2, handle3)
876 });
877 assert!(matches!(res1, Err(Error::Exited)));
878 assert!(matches!(res2, Err(Error::Exited)));
879 assert!(matches!(res3, Err(Error::Exited)));
880 }
881
882 fn test_select<R: Runner>(runner: R) {
883 runner.start(|_| async move {
884 let output = Mutex::new(0);
886 select! {
887 v1 = ready(1) => {
888 *output.lock().unwrap() = v1;
889 },
890 v2 = ready(2) => {
891 *output.lock().unwrap() = v2;
892 },
893 };
894 assert_eq!(*output.lock().unwrap(), 1);
895
896 select! {
898 v1 = std::future::pending::<i32>() => {
899 *output.lock().unwrap() = v1;
900 },
901 v2 = ready(2) => {
902 *output.lock().unwrap() = v2;
903 },
904 };
905 assert_eq!(*output.lock().unwrap(), 2);
906 });
907 }
908
909 fn test_select_loop<R: Runner>(runner: R)
911 where
912 R::Context: Clock,
913 {
914 runner.start(|context| async move {
915 let (mut sender, mut receiver) = mpsc::unbounded();
917 for _ in 0..2 {
918 select! {
919 v = receiver.next() => {
920 panic!("unexpected value: {v:?}");
921 },
922 _ = context.sleep(Duration::from_millis(100)) => {
923 continue;
924 },
925 };
926 }
927
928 sender.send(0).await.unwrap();
930 sender.send(1).await.unwrap();
931
932 select! {
934 _ = async {} => {
935 },
937 v = receiver.next() => {
938 panic!("unexpected value: {v:?}");
939 },
940 };
941
942 for i in 0..2 {
944 select! {
945 _ = context.sleep(Duration::from_millis(100)) => {
946 panic!("timeout");
947 },
948 v = receiver.next() => {
949 assert_eq!(v.unwrap(), i);
950 },
951 };
952 }
953 });
954 }
955
956 fn test_storage_operations<R: Runner>(runner: R)
957 where
958 R::Context: Storage,
959 {
960 runner.start(|context| async move {
961 let partition = "test_partition";
962 let name = b"test_blob";
963
964 let (blob, size) = context
966 .open(partition, name)
967 .await
968 .expect("Failed to open blob");
969 assert_eq!(size, 0, "new blob should have size 0");
970
971 let data = b"Hello, Storage!";
973 blob.write_at(Vec::from(data), 0)
974 .await
975 .expect("Failed to write to blob");
976
977 blob.sync().await.expect("Failed to sync blob");
979
980 let read = blob
982 .read_at(vec![0; data.len()], 0)
983 .await
984 .expect("Failed to read from blob");
985 assert_eq!(read.as_ref(), data);
986
987 blob.sync().await.expect("Failed to sync blob");
989
990 let blobs = context
992 .scan(partition)
993 .await
994 .expect("Failed to scan partition");
995 assert!(blobs.contains(&name.to_vec()));
996
997 let (blob, len) = context
999 .open(partition, name)
1000 .await
1001 .expect("Failed to reopen blob");
1002 assert_eq!(len, data.len() as u64);
1003
1004 let read = blob
1006 .read_at(vec![0u8; 7], 7)
1007 .await
1008 .expect("Failed to read data");
1009 assert_eq!(read.as_ref(), b"Storage");
1010
1011 blob.sync().await.expect("Failed to sync blob");
1013
1014 context
1016 .remove(partition, Some(name))
1017 .await
1018 .expect("Failed to remove blob");
1019
1020 let blobs = context
1022 .scan(partition)
1023 .await
1024 .expect("Failed to scan partition");
1025 assert!(!blobs.contains(&name.to_vec()));
1026
1027 context
1029 .remove(partition, None)
1030 .await
1031 .expect("Failed to remove partition");
1032
1033 let result = context.scan(partition).await;
1035 assert!(matches!(result, Err(Error::PartitionMissing(_))));
1036 });
1037 }
1038
1039 fn test_blob_read_write<R: Runner>(runner: R)
1040 where
1041 R::Context: Storage,
1042 {
1043 runner.start(|context| async move {
1044 let partition = "test_partition";
1045 let name = b"test_blob_rw";
1046
1047 let (blob, _) = context
1049 .open(partition, name)
1050 .await
1051 .expect("Failed to open blob");
1052
1053 let data1 = b"Hello";
1055 let data2 = b"World";
1056 blob.write_at(Vec::from(data1), 0)
1057 .await
1058 .expect("Failed to write data1");
1059 blob.write_at(Vec::from(data2), 5)
1060 .await
1061 .expect("Failed to write data2");
1062
1063 let read = blob
1065 .read_at(vec![0u8; 10], 0)
1066 .await
1067 .expect("Failed to read data");
1068 assert_eq!(&read.as_ref()[..5], data1);
1069 assert_eq!(&read.as_ref()[5..], data2);
1070
1071 let result = blob.read_at(vec![0u8; 10], 10).await;
1073 assert!(result.is_err());
1074
1075 let data3 = b"Store";
1077 blob.write_at(Vec::from(data3), 5)
1078 .await
1079 .expect("Failed to write data3");
1080
1081 let read = blob
1083 .read_at(vec![0u8; 10], 0)
1084 .await
1085 .expect("Failed to read data");
1086 assert_eq!(&read.as_ref()[..5], data1);
1087 assert_eq!(&read.as_ref()[5..], data3);
1088
1089 let result = blob.read_at(vec![0u8; 10], 10).await;
1091 assert!(result.is_err());
1092 });
1093 }
1094
1095 fn test_blob_resize<R: Runner>(runner: R)
1096 where
1097 R::Context: Storage,
1098 {
1099 runner.start(|context| async move {
1100 let partition = "test_partition_resize";
1101 let name = b"test_blob_resize";
1102
1103 let (blob, _) = context
1105 .open(partition, name)
1106 .await
1107 .expect("Failed to open blob");
1108
1109 let data = b"some data";
1110 blob.write_at(data.to_vec(), 0)
1111 .await
1112 .expect("Failed to write");
1113 blob.sync().await.expect("Failed to sync after write");
1114
1115 let (blob, len) = context.open(partition, name).await.unwrap();
1117 assert_eq!(len, data.len() as u64);
1118
1119 let new_len = (data.len() as u64) * 2;
1121 blob.resize(new_len)
1122 .await
1123 .expect("Failed to resize to extend");
1124 blob.sync().await.expect("Failed to sync after resize");
1125
1126 let (blob, len) = context.open(partition, name).await.unwrap();
1128 assert_eq!(len, new_len);
1129
1130 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1132 assert_eq!(read_buf.as_ref(), data);
1133
1134 let extended_part = blob
1136 .read_at(vec![0; data.len()], data.len() as u64)
1137 .await
1138 .unwrap();
1139 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1140
1141 blob.resize(data.len() as u64).await.unwrap();
1143 blob.sync().await.unwrap();
1144
1145 let (blob, size) = context.open(partition, name).await.unwrap();
1147 assert_eq!(size, data.len() as u64);
1148
1149 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1151 assert_eq!(read_buf.as_ref(), data);
1152 blob.sync().await.unwrap();
1153 });
1154 }
1155
1156 fn test_many_partition_read_write<R: Runner>(runner: R)
1157 where
1158 R::Context: Storage,
1159 {
1160 runner.start(|context| async move {
1161 let partitions = ["partition1", "partition2", "partition3"];
1162 let name = b"test_blob_rw";
1163 let data1 = b"Hello";
1164 let data2 = b"World";
1165
1166 for (additional, partition) in partitions.iter().enumerate() {
1167 let (blob, _) = context
1169 .open(partition, name)
1170 .await
1171 .expect("Failed to open blob");
1172
1173 blob.write_at(Vec::from(data1), 0)
1175 .await
1176 .expect("Failed to write data1");
1177 blob.write_at(Vec::from(data2), 5 + additional as u64)
1178 .await
1179 .expect("Failed to write data2");
1180
1181 blob.sync().await.expect("Failed to sync blob");
1183 }
1184
1185 for (additional, partition) in partitions.iter().enumerate() {
1186 let (blob, len) = context
1188 .open(partition, name)
1189 .await
1190 .expect("Failed to open blob");
1191 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1192
1193 let read = blob
1195 .read_at(vec![0u8; 10 + additional], 0)
1196 .await
1197 .expect("Failed to read data");
1198 assert_eq!(&read.as_ref()[..5], b"Hello");
1199 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1200 }
1201 });
1202 }
1203
1204 fn test_blob_read_past_length<R: Runner>(runner: R)
1205 where
1206 R::Context: Storage,
1207 {
1208 runner.start(|context| async move {
1209 let partition = "test_partition";
1210 let name = b"test_blob_rw";
1211
1212 let (blob, _) = context
1214 .open(partition, name)
1215 .await
1216 .expect("Failed to open blob");
1217
1218 let result = blob.read_at(vec![0u8; 10], 0).await;
1220 assert!(result.is_err());
1221
1222 let data = b"Hello, Storage!".to_vec();
1224 blob.write_at(data, 0)
1225 .await
1226 .expect("Failed to write to blob");
1227
1228 let result = blob.read_at(vec![0u8; 20], 0).await;
1230 assert!(result.is_err());
1231 })
1232 }
1233
1234 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1235 where
1236 R::Context: Spawner + Storage + Metrics,
1237 {
1238 runner.start(|context| async move {
1239 let partition = "test_partition";
1240 let name = b"test_blob_rw";
1241
1242 let (blob, _) = context
1244 .open(partition, name)
1245 .await
1246 .expect("Failed to open blob");
1247
1248 let data = b"Hello, Storage!";
1250 blob.write_at(Vec::from(data), 0)
1251 .await
1252 .expect("Failed to write to blob");
1253
1254 blob.sync().await.expect("Failed to sync blob");
1256
1257 let check1 = context.with_label("check1").spawn({
1259 let blob = blob.clone();
1260 move |_| async move {
1261 let read = blob
1262 .read_at(vec![0u8; data.len()], 0)
1263 .await
1264 .expect("Failed to read from blob");
1265 assert_eq!(read.as_ref(), data);
1266 }
1267 });
1268 let check2 = context.with_label("check2").spawn({
1269 let blob = blob.clone();
1270 move |_| async move {
1271 let read = blob
1272 .read_at(vec![0; data.len()], 0)
1273 .await
1274 .expect("Failed to read from blob");
1275 assert_eq!(read.as_ref(), data);
1276 }
1277 });
1278
1279 let result = join!(check1, check2);
1281 assert!(result.0.is_ok());
1282 assert!(result.1.is_ok());
1283
1284 let read = blob
1286 .read_at(vec![0; data.len()], 0)
1287 .await
1288 .expect("Failed to read from blob");
1289 assert_eq!(read.as_ref(), data);
1290
1291 drop(blob);
1293
1294 let buffer = context.encode();
1296 assert!(buffer.contains("open_blobs 0"));
1297 });
1298 }
1299
1300 fn test_shutdown<R: Runner>(runner: R)
1301 where
1302 R::Context: Spawner + Metrics + Clock,
1303 {
1304 let kill = 9;
1305 runner.start(|context| async move {
1306 let before = context
1308 .with_label("before")
1309 .spawn(move |context| async move {
1310 let mut signal = context.stopped();
1311 let value = (&mut signal).await.unwrap();
1312 assert_eq!(value, kill);
1313 drop(signal);
1314 });
1315
1316 let result = context.clone().stop(kill, None).await;
1318 assert!(result.is_ok());
1319
1320 let after = context
1322 .with_label("after")
1323 .spawn(move |context| async move {
1324 let value = context.stopped().await.unwrap();
1326 assert_eq!(value, kill);
1327 });
1328
1329 let result = join!(before, after);
1331 assert!(result.0.is_ok());
1332 assert!(result.1.is_ok());
1333 });
1334 }
1335
1336 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1337 where
1338 R::Context: Spawner + Metrics + Clock,
1339 {
1340 let kill = 42;
1341 runner.start(|context| async move {
1342 let (started_tx, mut started_rx) = mpsc::channel(3);
1343 let counter = Arc::new(AtomicU32::new(0));
1344
1345 let task = |cleanup_duration: Duration| {
1348 let context = context.clone();
1349 let counter = counter.clone();
1350 let mut started_tx = started_tx.clone();
1351 context.spawn(move |context| async move {
1352 let mut signal = context.stopped();
1354 started_tx.send(()).await.unwrap();
1355
1356 let value = (&mut signal).await.unwrap();
1358 assert_eq!(value, kill);
1359 context.sleep(cleanup_duration).await;
1360 counter.fetch_add(1, Ordering::SeqCst);
1361
1362 drop(signal);
1364 })
1365 };
1366
1367 let task1 = task(Duration::from_millis(10));
1368 let task2 = task(Duration::from_millis(20));
1369 let task3 = task(Duration::from_millis(30));
1370
1371 for _ in 0..3 {
1373 started_rx.next().await.unwrap();
1374 }
1375
1376 context.stop(kill, None).await.unwrap();
1378 assert_eq!(counter.load(Ordering::SeqCst), 3);
1379
1380 let result = join!(task1, task2, task3);
1382 assert!(result.0.is_ok());
1383 assert!(result.1.is_ok());
1384 assert!(result.2.is_ok());
1385 });
1386 }
1387
1388 fn test_shutdown_timeout<R: Runner>(runner: R)
1389 where
1390 R::Context: Spawner + Metrics + Clock,
1391 {
1392 let kill = 42;
1393 runner.start(|context| async move {
1394 let (started_tx, started_rx) = oneshot::channel();
1396
1397 context.clone().spawn(move |context| async move {
1399 let signal = context.stopped();
1400 started_tx.send(()).unwrap();
1401 pending::<()>().await;
1402 signal.await.unwrap();
1403 });
1404
1405 started_rx.await.unwrap();
1407 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1408
1409 assert!(matches!(result, Err(Error::Timeout)));
1411 });
1412 }
1413
1414 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1415 where
1416 R::Context: Spawner + Metrics + Clock,
1417 {
1418 let kill1 = 42;
1419 let kill2 = 43;
1420
1421 runner.start(|context| async move {
1422 let (started_tx, started_rx) = oneshot::channel();
1423 let counter = Arc::new(AtomicU32::new(0));
1424
1425 let task = context.with_label("blocking_task").spawn({
1427 let counter = counter.clone();
1428 move |context| async move {
1429 let mut signal = context.stopped();
1431 started_tx.send(()).unwrap();
1432
1433 let value = (&mut signal).await.unwrap();
1435 assert_eq!(value, kill1);
1436 context.sleep(Duration::from_millis(50)).await;
1437
1438 counter.fetch_add(1, Ordering::SeqCst);
1440 drop(signal);
1441 }
1442 });
1443
1444 started_rx.await.unwrap();
1446
1447 let stop_task1 = context.clone().stop(kill1, None);
1450 pin_mut!(stop_task1);
1451 let stop_task2 = context.clone().stop(kill2, None);
1452 pin_mut!(stop_task2);
1453
1454 assert!(stop_task1.as_mut().now_or_never().is_none());
1456 assert!(stop_task2.as_mut().now_or_never().is_none());
1457
1458 assert!(stop_task1.await.is_ok());
1460 assert!(stop_task2.await.is_ok());
1461
1462 let sig = context.stopped().await;
1464 assert_eq!(sig.unwrap(), kill1);
1465
1466 let result = task.await;
1468 assert!(result.is_ok());
1469 assert_eq!(counter.load(Ordering::SeqCst), 1);
1470
1471 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1473 });
1474 }
1475
1476 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1477 where
1478 R::Context: Spawner + Metrics,
1479 {
1480 runner.start(|context| async move {
1481 context
1483 .with_label("before")
1484 .spawn(move |context| async move {
1485 let mut signal = context.stopped();
1486 let value = (&mut signal).await.unwrap();
1487
1488 assert_eq!(value, 42);
1490 drop(signal);
1491 });
1492
1493 reschedule().await;
1495 });
1496 }
1497
1498 fn test_spawn_dedicated<R: Runner>(runner: R)
1499 where
1500 R::Context: Spawner,
1501 {
1502 runner.start(|context| async move {
1503 let handle = context.dedicated().spawn(|_| async move { 42 });
1504 assert!(matches!(handle.await, Ok(42)));
1505 });
1506 }
1507
1508 fn test_spawn<R: Runner>(runner: R)
1509 where
1510 R::Context: Spawner + Clock,
1511 {
1512 runner.start(|context| async move {
1513 let child_handle = Arc::new(Mutex::new(None));
1514 let child_handle2 = child_handle.clone();
1515
1516 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1517 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1518 let parent_handle = context.spawn(move |context| async move {
1519 let handle = context.spawn(|_| async {});
1521
1522 *child_handle2.lock().unwrap() = Some(handle);
1524
1525 parent_initialized_tx.send(()).unwrap();
1526
1527 parent_complete_rx.await.unwrap();
1529 });
1530
1531 parent_initialized_rx.await.unwrap();
1533
1534 let child_handle = child_handle.lock().unwrap().take().unwrap();
1536 assert!(child_handle.await.is_ok());
1537
1538 parent_complete_tx.send(()).unwrap();
1540
1541 assert!(parent_handle.await.is_ok());
1543 });
1544 }
1545
1546 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1547 where
1548 R::Context: Spawner + Clock,
1549 {
1550 runner.start(|context| async move {
1551 let child_handle = Arc::new(Mutex::new(None));
1552 let child_handle2 = child_handle.clone();
1553
1554 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1555 let parent_handle = context.spawn(move |context| async move {
1556 let handle = context.spawn(|_| pending::<()>());
1558
1559 *child_handle2.lock().unwrap() = Some(handle);
1561
1562 parent_initialized_tx.send(()).unwrap();
1563
1564 pending::<()>().await
1566 });
1567
1568 parent_initialized_rx.await.unwrap();
1570
1571 parent_handle.abort();
1573 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1574
1575 let child_handle = child_handle.lock().unwrap().take().unwrap();
1577 assert!(matches!(child_handle.await, Err(Error::Closed)));
1578 });
1579 }
1580
1581 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1582 where
1583 R::Context: Spawner + Clock,
1584 {
1585 runner.start(|context| async move {
1586 let child_handle = Arc::new(Mutex::new(None));
1587 let child_handle2 = child_handle.clone();
1588
1589 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1590 let parent_handle = context.spawn(move |context| async move {
1591 let handle = context.spawn(|_| pending::<()>());
1593
1594 *child_handle2.lock().unwrap() = Some(handle);
1596
1597 parent_complete_rx.await.unwrap();
1599 });
1600
1601 parent_complete_tx.send(()).unwrap();
1603
1604 assert!(parent_handle.await.is_ok());
1606
1607 let child_handle = child_handle.lock().unwrap().take().unwrap();
1609 assert!(matches!(child_handle.await, Err(Error::Closed)));
1610 });
1611 }
1612
1613 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1614 where
1615 R::Context: Spawner + Clock,
1616 {
1617 runner.start(|context| async move {
1618 let c0 = context.clone();
1628 let g0 = c0.clone();
1629 let g1 = c0.clone();
1630 let c1 = context.clone();
1631 let g2 = c1.clone();
1632 let g3 = c1.clone();
1633 let c2 = context.clone();
1634 let g4 = c2.clone();
1635 let g5 = c2.clone();
1636
1637 let handles = Arc::new(Mutex::new(Vec::new()));
1639 let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1640 let root_task = context.spawn({
1641 let handles = handles.clone();
1642 move |_| async move {
1643 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1644 {
1645 let handle = context.spawn({
1646 let handles = handles.clone();
1647 let mut initialized_tx = initialized_tx.clone();
1648 move |_| async move {
1649 for grandchild in grandchildren {
1650 let handle = grandchild.spawn(|_| async {
1651 pending::<()>().await;
1652 });
1653 handles.lock().unwrap().push(handle);
1654 initialized_tx.send(()).await.unwrap();
1655 }
1656
1657 pending::<()>().await;
1658 }
1659 });
1660 handles.lock().unwrap().push(handle);
1661 initialized_tx.send(()).await.unwrap();
1662 }
1663
1664 pending::<()>().await;
1665 }
1666 });
1667
1668 for _ in 0..9 {
1670 initialized_rx.next().await.unwrap();
1671 }
1672
1673 assert_eq!(handles.lock().unwrap().len(), 9);
1675
1676 root_task.abort();
1678 assert!(matches!(root_task.await, Err(Error::Closed)));
1679
1680 let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1682 for handle in handles {
1683 assert!(matches!(handle.await, Err(Error::Closed)));
1684 }
1685 });
1686 }
1687
1688 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1689 where
1690 R::Context: Spawner + Clock,
1691 {
1692 runner.start(|context| async move {
1693 let (child_started_tx, child_started_rx) = oneshot::channel();
1694 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1695 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1696 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1697 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1698 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1699 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1700
1701 let parent = context.spawn(move |context| async move {
1702 let child_handle = context.clone().spawn(|_| async move {
1704 child_started_tx.send(()).unwrap();
1705 child_complete_rx.await.unwrap();
1707 });
1708 assert!(
1709 child_handle_tx.send(child_handle).is_ok(),
1710 "child handle receiver dropped"
1711 );
1712
1713 let sibling_handle = context.clone().spawn(move |_| async move {
1715 sibling_started_tx.send(()).unwrap();
1716 sibling_complete_rx.await.unwrap();
1718 });
1719 assert!(
1720 sibling_handle_tx.send(sibling_handle).is_ok(),
1721 "sibling handle receiver dropped"
1722 );
1723
1724 parent_complete_rx.await.unwrap();
1726 });
1727
1728 child_started_rx.await.unwrap();
1730 sibling_started_rx.await.unwrap();
1731
1732 sibling_complete_tx.send(()).unwrap();
1734 assert!(sibling_handle_rx.await.is_ok());
1735
1736 child_complete_tx.send(()).unwrap();
1738 assert!(child_handle_rx.await.is_ok());
1739
1740 parent_complete_tx.send(()).unwrap();
1742 assert!(parent.await.is_ok());
1743 });
1744 }
1745
1746 fn test_spawn_clone_chain<R: Runner>(runner: R)
1747 where
1748 R::Context: Spawner + Clock,
1749 {
1750 runner.start(|context| async move {
1751 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1752 let (child_started_tx, child_started_rx) = oneshot::channel();
1753 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1754 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1755 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1756
1757 let parent = context.clone().spawn({
1758 move |context| async move {
1759 let child = context.clone().spawn({
1760 move |context| async move {
1761 let grandchild = context.clone().spawn({
1762 move |_| async move {
1763 grandchild_started_tx.send(()).unwrap();
1764 pending::<()>().await;
1765 }
1766 });
1767 assert!(
1768 grandchild_handle_tx.send(grandchild).is_ok(),
1769 "grandchild handle receiver dropped"
1770 );
1771 child_started_tx.send(()).unwrap();
1772 pending::<()>().await;
1773 }
1774 });
1775 assert!(
1776 child_handle_tx.send(child).is_ok(),
1777 "child handle receiver dropped"
1778 );
1779 parent_started_tx.send(()).unwrap();
1780 pending::<()>().await;
1781 }
1782 });
1783
1784 parent_started_rx.await.unwrap();
1785 child_started_rx.await.unwrap();
1786 grandchild_started_rx.await.unwrap();
1787
1788 let child_handle = child_handle_rx.await.unwrap();
1789 let grandchild_handle = grandchild_handle_rx.await.unwrap();
1790
1791 parent.abort();
1792 assert!(parent.await.is_err());
1793
1794 assert!(child_handle.await.is_err());
1795 assert!(grandchild_handle.await.is_err());
1796 });
1797 }
1798
1799 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1800 where
1801 R::Context: Spawner + Clock,
1802 {
1803 runner.start(|context| async move {
1804 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1805 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1806
1807 let parent = context.clone().spawn({
1808 move |context| async move {
1809 let clone1 = context.clone();
1810 let clone2 = clone1.clone();
1811 let clone3 = clone2.clone();
1812
1813 let leaf = clone3.clone().spawn({
1814 move |_| async move {
1815 leaf_started_tx.send(()).unwrap();
1816 pending::<()>().await;
1817 }
1818 });
1819
1820 leaf_handle_tx
1821 .send(leaf)
1822 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1823 pending::<()>().await;
1824 }
1825 });
1826
1827 leaf_started_rx.await.unwrap();
1828 let leaf_handle = leaf_handle_rx.await.unwrap();
1829
1830 parent.abort();
1831 assert!(parent.await.is_err());
1832 assert!(leaf_handle.await.is_err());
1833 });
1834 }
1835
1836 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1837 where
1838 R::Context: Spawner,
1839 {
1840 runner.start(|context| async move {
1841 let context = if dedicated {
1842 context.dedicated()
1843 } else {
1844 context.shared(true)
1845 };
1846
1847 let handle = context.spawn(|_| async move { 42 });
1848 let result = handle.await;
1849 assert!(matches!(result, Ok(42)));
1850 });
1851 }
1852
1853 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1854 where
1855 R::Context: Spawner + Clock,
1856 {
1857 runner.start(|context| async move {
1858 let context = if dedicated {
1859 context.dedicated()
1860 } else {
1861 context.shared(true)
1862 };
1863
1864 context.clone().spawn(|_| async move {
1865 panic!("blocking task panicked");
1866 });
1867
1868 loop {
1870 context.sleep(Duration::from_millis(100)).await;
1871 }
1872 });
1873 }
1874
1875 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1876 where
1877 R::Context: Spawner + Clock,
1878 {
1879 let result: Result<(), Error> = runner.start(|context| async move {
1880 let context = if dedicated {
1881 context.dedicated()
1882 } else {
1883 context.shared(true)
1884 };
1885
1886 let handle = context.clone().spawn(|_| async move {
1887 panic!("blocking task panicked");
1888 });
1889 handle.await
1890 });
1891 assert!(matches!(result, Err(Error::Exited)));
1892 }
1893
1894 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1895 runner.start(|_| async move {
1896 let dropper = Arc::new(());
1898 let executor = deterministic::Runner::default();
1899 executor.start({
1900 let dropper = dropper.clone();
1901 move |context| async move {
1902 let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1904 let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1905 let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1906
1907 context.with_label("task1").spawn({
1909 let mut setup_tx = setup_tx.clone();
1910 let dropper = dropper.clone();
1911 move |_| async move {
1912 tx2.send(()).await.unwrap();
1914 rx1.next().await.unwrap();
1915 setup_tx.send(()).await.unwrap();
1916
1917 while rx1.next().await.is_some() {}
1919 drop(tx2);
1920 drop(dropper);
1921 }
1922 });
1923
1924 context.with_label("task2").spawn(move |_| async move {
1926 tx1.send(()).await.unwrap();
1928 rx2.next().await.unwrap();
1929 setup_tx.send(()).await.unwrap();
1930
1931 while rx2.next().await.is_some() {}
1933 drop(tx1);
1934 drop(dropper);
1935 });
1936
1937 setup_rx.next().await.unwrap();
1939 setup_rx.next().await.unwrap();
1940 }
1941 });
1942
1943 Arc::try_unwrap(dropper).expect("references remaining");
1945 });
1946 }
1947
1948 fn test_late_waker<R: Runner>(runner: R)
1949 where
1950 R::Context: Metrics + Spawner,
1951 {
1952 struct CaptureWaker {
1955 tx: Option<oneshot::Sender<Waker>>,
1956 sent: bool,
1957 }
1958 impl Future for CaptureWaker {
1959 type Output = ();
1960 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1961 if !self.sent {
1962 if let Some(tx) = self.tx.take() {
1963 let _ = tx.send(cx.waker().clone());
1965 }
1966 self.sent = true;
1967 }
1968 Poll::Pending
1969 }
1970 }
1971
1972 struct WakeOnDrop(Option<Waker>);
1974 impl Drop for WakeOnDrop {
1975 fn drop(&mut self) {
1976 if let Some(w) = self.0.take() {
1977 w.wake_by_ref();
1978 }
1979 }
1980 }
1981
1982 let holder = runner.start(|context| async move {
1984 let (tx, rx) = oneshot::channel::<Waker>();
1986
1987 context
1989 .with_label("capture_waker")
1990 .spawn(move |_| async move {
1991 CaptureWaker {
1992 tx: Some(tx),
1993 sent: false,
1994 }
1995 .await;
1996 });
1997
1998 utils::reschedule().await;
2000
2001 let waker = rx.await.expect("waker not received");
2003
2004 WakeOnDrop(Some(waker))
2006 });
2007
2008 drop(holder);
2011 }
2012
2013 fn test_metrics<R: Runner>(runner: R)
2014 where
2015 R::Context: Metrics,
2016 {
2017 runner.start(|context| async move {
2018 assert_eq!(context.label(), "");
2020
2021 let counter = Counter::<u64>::default();
2023 context.register("test", "test", counter.clone());
2024
2025 counter.inc();
2027
2028 let buffer = context.encode();
2030 assert!(buffer.contains("test_total 1"));
2031
2032 let context = context.with_label("nested");
2034 let nested_counter = Counter::<u64>::default();
2035 context.register("test", "test", nested_counter.clone());
2036
2037 nested_counter.inc();
2039
2040 let buffer = context.encode();
2042 assert!(buffer.contains("nested_test_total 1"));
2043 assert!(buffer.contains("test_total 1"));
2044 });
2045 }
2046
2047 fn test_metrics_label<R: Runner>(runner: R)
2048 where
2049 R::Context: Metrics,
2050 {
2051 runner.start(|context| async move {
2052 context.with_label(METRICS_PREFIX);
2053 })
2054 }
2055
2056 fn test_metrics_label_empty<R: Runner>(runner: R)
2057 where
2058 R::Context: Metrics,
2059 {
2060 runner.start(|context| async move {
2061 context.with_label("");
2062 })
2063 }
2064
2065 fn test_metrics_label_invalid_first_char<R: Runner>(runner: R)
2066 where
2067 R::Context: Metrics,
2068 {
2069 runner.start(|context| async move {
2070 context.with_label("1invalid");
2071 })
2072 }
2073
2074 fn test_metrics_label_invalid_char<R: Runner>(runner: R)
2075 where
2076 R::Context: Metrics,
2077 {
2078 runner.start(|context| async move {
2079 context.with_label("invalid-label");
2080 })
2081 }
2082
2083 #[test]
2084 fn test_deterministic_future() {
2085 let runner = deterministic::Runner::default();
2086 test_error_future(runner);
2087 }
2088
2089 #[test]
2090 fn test_deterministic_clock_sleep() {
2091 let executor = deterministic::Runner::default();
2092 test_clock_sleep(executor);
2093 }
2094
2095 #[test]
2096 fn test_deterministic_clock_sleep_until() {
2097 let executor = deterministic::Runner::default();
2098 test_clock_sleep_until(executor);
2099 }
2100
2101 #[test]
2102 fn test_deterministic_clock_timeout() {
2103 let executor = deterministic::Runner::default();
2104 test_clock_timeout(executor);
2105 }
2106
2107 #[test]
2108 fn test_deterministic_root_finishes() {
2109 let executor = deterministic::Runner::default();
2110 test_root_finishes(executor);
2111 }
2112
2113 #[test]
2114 fn test_deterministic_spawn_after_abort() {
2115 let executor = deterministic::Runner::default();
2116 test_spawn_after_abort(executor);
2117 }
2118
2119 #[test]
2120 fn test_deterministic_spawn_abort() {
2121 let executor = deterministic::Runner::default();
2122 test_spawn_abort(executor, false, false);
2123 }
2124
2125 #[test]
2126 #[should_panic(expected = "blah")]
2127 fn test_deterministic_panic_aborts_root() {
2128 let runner = deterministic::Runner::default();
2129 test_panic_aborts_root(runner);
2130 }
2131
2132 #[test]
2133 #[should_panic(expected = "blah")]
2134 fn test_deterministic_panic_aborts_root_caught() {
2135 let cfg = deterministic::Config::default().with_catch_panics(true);
2136 let runner = deterministic::Runner::new(cfg);
2137 test_panic_aborts_root(runner);
2138 }
2139
2140 #[test]
2141 #[should_panic(expected = "blah")]
2142 fn test_deterministic_panic_aborts_spawn() {
2143 let executor = deterministic::Runner::default();
2144 test_panic_aborts_spawn(executor);
2145 }
2146
2147 #[test]
2148 fn test_deterministic_panic_aborts_spawn_caught() {
2149 let cfg = deterministic::Config::default().with_catch_panics(true);
2150 let executor = deterministic::Runner::new(cfg);
2151 test_panic_aborts_spawn_caught(executor);
2152 }
2153
2154 #[test]
2155 #[should_panic(expected = "boom")]
2156 fn test_deterministic_multiple_panics() {
2157 let executor = deterministic::Runner::default();
2158 test_multiple_panics(executor);
2159 }
2160
2161 #[test]
2162 fn test_deterministic_multiple_panics_caught() {
2163 let cfg = deterministic::Config::default().with_catch_panics(true);
2164 let executor = deterministic::Runner::new(cfg);
2165 test_multiple_panics_caught(executor);
2166 }
2167
2168 #[test]
2169 fn test_deterministic_select() {
2170 let executor = deterministic::Runner::default();
2171 test_select(executor);
2172 }
2173
2174 #[test]
2175 fn test_deterministic_select_loop() {
2176 let executor = deterministic::Runner::default();
2177 test_select_loop(executor);
2178 }
2179
2180 #[test]
2181 fn test_deterministic_storage_operations() {
2182 let executor = deterministic::Runner::default();
2183 test_storage_operations(executor);
2184 }
2185
2186 #[test]
2187 fn test_deterministic_blob_read_write() {
2188 let executor = deterministic::Runner::default();
2189 test_blob_read_write(executor);
2190 }
2191
2192 #[test]
2193 fn test_deterministic_blob_resize() {
2194 let executor = deterministic::Runner::default();
2195 test_blob_resize(executor);
2196 }
2197
2198 #[test]
2199 fn test_deterministic_many_partition_read_write() {
2200 let executor = deterministic::Runner::default();
2201 test_many_partition_read_write(executor);
2202 }
2203
2204 #[test]
2205 fn test_deterministic_blob_read_past_length() {
2206 let executor = deterministic::Runner::default();
2207 test_blob_read_past_length(executor);
2208 }
2209
2210 #[test]
2211 fn test_deterministic_blob_clone_and_concurrent_read() {
2212 let executor = deterministic::Runner::default();
2214 test_blob_clone_and_concurrent_read(executor);
2215 }
2216
2217 #[test]
2218 fn test_deterministic_shutdown() {
2219 let executor = deterministic::Runner::default();
2220 test_shutdown(executor);
2221 }
2222
2223 #[test]
2224 fn test_deterministic_shutdown_multiple_signals() {
2225 let executor = deterministic::Runner::default();
2226 test_shutdown_multiple_signals(executor);
2227 }
2228
2229 #[test]
2230 fn test_deterministic_shutdown_timeout() {
2231 let executor = deterministic::Runner::default();
2232 test_shutdown_timeout(executor);
2233 }
2234
2235 #[test]
2236 fn test_deterministic_shutdown_multiple_stop_calls() {
2237 let executor = deterministic::Runner::default();
2238 test_shutdown_multiple_stop_calls(executor);
2239 }
2240
2241 #[test]
2242 fn test_deterministic_unfulfilled_shutdown() {
2243 let executor = deterministic::Runner::default();
2244 test_unfulfilled_shutdown(executor);
2245 }
2246
2247 #[test]
2248 fn test_deterministic_spawn_dedicated() {
2249 let executor = deterministic::Runner::default();
2250 test_spawn_dedicated(executor);
2251 }
2252
2253 #[test]
2254 fn test_deterministic_spawn() {
2255 let runner = deterministic::Runner::default();
2256 test_spawn(runner);
2257 }
2258
2259 #[test]
2260 fn test_deterministic_spawn_abort_on_parent_abort() {
2261 let runner = deterministic::Runner::default();
2262 test_spawn_abort_on_parent_abort(runner);
2263 }
2264
2265 #[test]
2266 fn test_deterministic_spawn_abort_on_parent_completion() {
2267 let runner = deterministic::Runner::default();
2268 test_spawn_abort_on_parent_completion(runner);
2269 }
2270
2271 #[test]
2272 fn test_deterministic_spawn_cascading_abort() {
2273 let runner = deterministic::Runner::default();
2274 test_spawn_cascading_abort(runner);
2275 }
2276
2277 #[test]
2278 fn test_deterministic_child_survives_sibling_completion() {
2279 let runner = deterministic::Runner::default();
2280 test_child_survives_sibling_completion(runner);
2281 }
2282
2283 #[test]
2284 fn test_deterministic_spawn_clone_chain() {
2285 let runner = deterministic::Runner::default();
2286 test_spawn_clone_chain(runner);
2287 }
2288
2289 #[test]
2290 fn test_deterministic_spawn_sparse_clone_chain() {
2291 let runner = deterministic::Runner::default();
2292 test_spawn_sparse_clone_chain(runner);
2293 }
2294
2295 #[test]
2296 fn test_deterministic_spawn_blocking() {
2297 for dedicated in [false, true] {
2298 let executor = deterministic::Runner::default();
2299 test_spawn_blocking(executor, dedicated);
2300 }
2301 }
2302
2303 #[test]
2304 #[should_panic(expected = "blocking task panicked")]
2305 fn test_deterministic_spawn_blocking_panic() {
2306 for dedicated in [false, true] {
2307 let executor = deterministic::Runner::default();
2308 test_spawn_blocking_panic(executor, dedicated);
2309 }
2310 }
2311
2312 #[test]
2313 fn test_deterministic_spawn_blocking_panic_caught() {
2314 for dedicated in [false, true] {
2315 let cfg = deterministic::Config::default().with_catch_panics(true);
2316 let executor = deterministic::Runner::new(cfg);
2317 test_spawn_blocking_panic_caught(executor, dedicated);
2318 }
2319 }
2320
2321 #[test]
2322 fn test_deterministic_spawn_blocking_abort() {
2323 for (dedicated, blocking) in [(false, true), (true, false)] {
2324 let executor = deterministic::Runner::default();
2325 test_spawn_abort(executor, dedicated, blocking);
2326 }
2327 }
2328
2329 #[test]
2330 fn test_deterministic_circular_reference_prevents_cleanup() {
2331 let executor = deterministic::Runner::default();
2332 test_circular_reference_prevents_cleanup(executor);
2333 }
2334
2335 #[test]
2336 fn test_deterministic_late_waker() {
2337 let executor = deterministic::Runner::default();
2338 test_late_waker(executor);
2339 }
2340
2341 #[test]
2342 fn test_deterministic_metrics() {
2343 let executor = deterministic::Runner::default();
2344 test_metrics(executor);
2345 }
2346
2347 #[test]
2348 #[should_panic]
2349 fn test_deterministic_metrics_label() {
2350 let executor = deterministic::Runner::default();
2351 test_metrics_label(executor);
2352 }
2353
2354 #[test]
2355 #[should_panic(expected = "label must start with [a-zA-Z]")]
2356 fn test_deterministic_metrics_label_empty() {
2357 let executor = deterministic::Runner::default();
2358 test_metrics_label_empty(executor);
2359 }
2360
2361 #[test]
2362 #[should_panic(expected = "label must start with [a-zA-Z]")]
2363 fn test_deterministic_metrics_label_invalid_first_char() {
2364 let executor = deterministic::Runner::default();
2365 test_metrics_label_invalid_first_char(executor);
2366 }
2367
2368 #[test]
2369 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2370 fn test_deterministic_metrics_label_invalid_char() {
2371 let executor = deterministic::Runner::default();
2372 test_metrics_label_invalid_char(executor);
2373 }
2374
2375 #[test_collect_traces]
2376 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2377 let executor = deterministic::Runner::new(deterministic::Config::default());
2378 executor.start(|context| async move {
2379 context
2380 .with_label("test")
2381 .instrumented()
2382 .spawn(|context| async move {
2383 tracing::info!(field = "test field", "test log");
2384
2385 context
2386 .with_label("inner")
2387 .instrumented()
2388 .spawn(|_| async move {
2389 tracing::info!("inner log");
2390 })
2391 .await
2392 .unwrap();
2393 })
2394 .await
2395 .unwrap();
2396 });
2397
2398 let info_traces = traces.get_by_level(Level::INFO);
2399 assert_eq!(info_traces.len(), 2);
2400
2401 info_traces
2403 .expect_event_at_index(0, |event| {
2404 event.metadata.expect_content_exact("test log")?;
2405 event.metadata.expect_field_count(1)?;
2406 event.metadata.expect_field_exact("field", "test field")?;
2407 event.expect_span_count(1)?;
2408 event.expect_span_at_index(0, |span| {
2409 span.expect_content_exact("task")?;
2410 span.expect_field_count(1)?;
2411 span.expect_field_exact("name", "test")
2412 })
2413 })
2414 .unwrap();
2415
2416 info_traces
2417 .expect_event_at_index(1, |event| {
2418 event.metadata.expect_content_exact("inner log")?;
2419 event.metadata.expect_field_count(0)?;
2420 event.expect_span_count(1)?;
2421 event.expect_span_at_index(0, |span| {
2422 span.expect_content_exact("task")?;
2423 span.expect_field_count(1)?;
2424 span.expect_field_exact("name", "test_inner")
2425 })
2426 })
2427 .unwrap();
2428 }
2429
2430 #[test]
2431 fn test_deterministic_resolver() {
2432 let executor = deterministic::Runner::default();
2433 executor.start(|context| async move {
2434 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
2436 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
2437 context.resolver_register("example.com", Some(vec![ip1, ip2]));
2438
2439 let addrs = context.resolve("example.com").await.unwrap();
2441 assert_eq!(addrs, vec![ip1, ip2]);
2442
2443 let result = context.resolve("unknown.com").await;
2445 assert!(matches!(result, Err(Error::ResolveFailed(_))));
2446
2447 context.resolver_register("example.com", None);
2449 let result = context.resolve("example.com").await;
2450 assert!(matches!(result, Err(Error::ResolveFailed(_))));
2451 });
2452 }
2453
2454 #[test]
2455 fn test_tokio_error_future() {
2456 let runner = tokio::Runner::default();
2457 test_error_future(runner);
2458 }
2459
2460 #[test]
2461 fn test_tokio_clock_sleep() {
2462 let executor = tokio::Runner::default();
2463 test_clock_sleep(executor);
2464 }
2465
2466 #[test]
2467 fn test_tokio_clock_sleep_until() {
2468 let executor = tokio::Runner::default();
2469 test_clock_sleep_until(executor);
2470 }
2471
2472 #[test]
2473 fn test_tokio_clock_timeout() {
2474 let executor = tokio::Runner::default();
2475 test_clock_timeout(executor);
2476 }
2477
2478 #[test]
2479 fn test_tokio_root_finishes() {
2480 let executor = tokio::Runner::default();
2481 test_root_finishes(executor);
2482 }
2483
2484 #[test]
2485 fn test_tokio_spawn_after_abort() {
2486 let executor = tokio::Runner::default();
2487 test_spawn_after_abort(executor);
2488 }
2489
2490 #[test]
2491 fn test_tokio_spawn_abort() {
2492 let executor = tokio::Runner::default();
2493 test_spawn_abort(executor, false, false);
2494 }
2495
2496 #[test]
2497 #[should_panic(expected = "blah")]
2498 fn test_tokio_panic_aborts_root() {
2499 let executor = tokio::Runner::default();
2500 test_panic_aborts_root(executor);
2501 }
2502
2503 #[test]
2504 #[should_panic(expected = "blah")]
2505 fn test_tokio_panic_aborts_root_caught() {
2506 let cfg = tokio::Config::default().with_catch_panics(true);
2507 let executor = tokio::Runner::new(cfg);
2508 test_panic_aborts_root(executor);
2509 }
2510
2511 #[test]
2512 #[should_panic(expected = "blah")]
2513 fn test_tokio_panic_aborts_spawn() {
2514 let executor = tokio::Runner::default();
2515 test_panic_aborts_spawn(executor);
2516 }
2517
2518 #[test]
2519 fn test_tokio_panic_aborts_spawn_caught() {
2520 let cfg = tokio::Config::default().with_catch_panics(true);
2521 let executor = tokio::Runner::new(cfg);
2522 test_panic_aborts_spawn_caught(executor);
2523 }
2524
2525 #[test]
2526 #[should_panic(expected = "boom")]
2527 fn test_tokio_multiple_panics() {
2528 let executor = tokio::Runner::default();
2529 test_multiple_panics(executor);
2530 }
2531
2532 #[test]
2533 fn test_tokio_multiple_panics_caught() {
2534 let cfg = tokio::Config::default().with_catch_panics(true);
2535 let executor = tokio::Runner::new(cfg);
2536 test_multiple_panics_caught(executor);
2537 }
2538
2539 #[test]
2540 fn test_tokio_select() {
2541 let executor = tokio::Runner::default();
2542 test_select(executor);
2543 }
2544
2545 #[test]
2546 fn test_tokio_select_loop() {
2547 let executor = tokio::Runner::default();
2548 test_select_loop(executor);
2549 }
2550
2551 #[test]
2552 fn test_tokio_storage_operations() {
2553 let executor = tokio::Runner::default();
2554 test_storage_operations(executor);
2555 }
2556
2557 #[test]
2558 fn test_tokio_blob_read_write() {
2559 let executor = tokio::Runner::default();
2560 test_blob_read_write(executor);
2561 }
2562
2563 #[test]
2564 fn test_tokio_blob_resize() {
2565 let executor = tokio::Runner::default();
2566 test_blob_resize(executor);
2567 }
2568
2569 #[test]
2570 fn test_tokio_many_partition_read_write() {
2571 let executor = tokio::Runner::default();
2572 test_many_partition_read_write(executor);
2573 }
2574
2575 #[test]
2576 fn test_tokio_blob_read_past_length() {
2577 let executor = tokio::Runner::default();
2578 test_blob_read_past_length(executor);
2579 }
2580
2581 #[test]
2582 fn test_tokio_blob_clone_and_concurrent_read() {
2583 let executor = tokio::Runner::default();
2585 test_blob_clone_and_concurrent_read(executor);
2586 }
2587
2588 #[test]
2589 fn test_tokio_shutdown() {
2590 let executor = tokio::Runner::default();
2591 test_shutdown(executor);
2592 }
2593
2594 #[test]
2595 fn test_tokio_shutdown_multiple_signals() {
2596 let executor = tokio::Runner::default();
2597 test_shutdown_multiple_signals(executor);
2598 }
2599
2600 #[test]
2601 fn test_tokio_shutdown_timeout() {
2602 let executor = tokio::Runner::default();
2603 test_shutdown_timeout(executor);
2604 }
2605
2606 #[test]
2607 fn test_tokio_shutdown_multiple_stop_calls() {
2608 let executor = tokio::Runner::default();
2609 test_shutdown_multiple_stop_calls(executor);
2610 }
2611
2612 #[test]
2613 fn test_tokio_unfulfilled_shutdown() {
2614 let executor = tokio::Runner::default();
2615 test_unfulfilled_shutdown(executor);
2616 }
2617
2618 #[test]
2619 fn test_tokio_spawn_dedicated() {
2620 let executor = tokio::Runner::default();
2621 test_spawn_dedicated(executor);
2622 }
2623
2624 #[test]
2625 fn test_tokio_spawn() {
2626 let runner = tokio::Runner::default();
2627 test_spawn(runner);
2628 }
2629
2630 #[test]
2631 fn test_tokio_spawn_abort_on_parent_abort() {
2632 let runner = tokio::Runner::default();
2633 test_spawn_abort_on_parent_abort(runner);
2634 }
2635
2636 #[test]
2637 fn test_tokio_spawn_abort_on_parent_completion() {
2638 let runner = tokio::Runner::default();
2639 test_spawn_abort_on_parent_completion(runner);
2640 }
2641
2642 #[test]
2643 fn test_tokio_spawn_cascading_abort() {
2644 let runner = tokio::Runner::default();
2645 test_spawn_cascading_abort(runner);
2646 }
2647
2648 #[test]
2649 fn test_tokio_child_survives_sibling_completion() {
2650 let runner = tokio::Runner::default();
2651 test_child_survives_sibling_completion(runner);
2652 }
2653
2654 #[test]
2655 fn test_tokio_spawn_clone_chain() {
2656 let runner = tokio::Runner::default();
2657 test_spawn_clone_chain(runner);
2658 }
2659
2660 #[test]
2661 fn test_tokio_spawn_sparse_clone_chain() {
2662 let runner = tokio::Runner::default();
2663 test_spawn_sparse_clone_chain(runner);
2664 }
2665
2666 #[test]
2667 fn test_tokio_spawn_blocking() {
2668 for dedicated in [false, true] {
2669 let executor = tokio::Runner::default();
2670 test_spawn_blocking(executor, dedicated);
2671 }
2672 }
2673
2674 #[test]
2675 #[should_panic(expected = "blocking task panicked")]
2676 fn test_tokio_spawn_blocking_panic() {
2677 for dedicated in [false, true] {
2678 let executor = tokio::Runner::default();
2679 test_spawn_blocking_panic(executor, dedicated);
2680 }
2681 }
2682
2683 #[test]
2684 fn test_tokio_spawn_blocking_panic_caught() {
2685 for dedicated in [false, true] {
2686 let cfg = tokio::Config::default().with_catch_panics(true);
2687 let executor = tokio::Runner::new(cfg);
2688 test_spawn_blocking_panic_caught(executor, dedicated);
2689 }
2690 }
2691
2692 #[test]
2693 fn test_tokio_spawn_blocking_abort() {
2694 for (dedicated, blocking) in [(false, true), (true, false)] {
2695 let executor = tokio::Runner::default();
2696 test_spawn_abort(executor, dedicated, blocking);
2697 }
2698 }
2699
2700 #[test]
2701 fn test_tokio_circular_reference_prevents_cleanup() {
2702 let executor = tokio::Runner::default();
2703 test_circular_reference_prevents_cleanup(executor);
2704 }
2705
2706 #[test]
2707 fn test_tokio_late_waker() {
2708 let executor = tokio::Runner::default();
2709 test_late_waker(executor);
2710 }
2711
2712 #[test]
2713 fn test_tokio_metrics() {
2714 let executor = tokio::Runner::default();
2715 test_metrics(executor);
2716 }
2717
2718 #[test]
2719 #[should_panic]
2720 fn test_tokio_metrics_label() {
2721 let executor = tokio::Runner::default();
2722 test_metrics_label(executor);
2723 }
2724
2725 #[test]
2726 #[should_panic(expected = "label must start with [a-zA-Z]")]
2727 fn test_tokio_metrics_label_empty() {
2728 let executor = tokio::Runner::default();
2729 test_metrics_label_empty(executor);
2730 }
2731
2732 #[test]
2733 #[should_panic(expected = "label must start with [a-zA-Z]")]
2734 fn test_tokio_metrics_label_invalid_first_char() {
2735 let executor = tokio::Runner::default();
2736 test_metrics_label_invalid_first_char(executor);
2737 }
2738
2739 #[test]
2740 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2741 fn test_tokio_metrics_label_invalid_char() {
2742 let executor = tokio::Runner::default();
2743 test_metrics_label_invalid_char(executor);
2744 }
2745
2746 #[test]
2747 fn test_tokio_process_rss_metric() {
2748 let executor = tokio::Runner::default();
2749 executor.start(|context| async move {
2750 loop {
2751 let metrics = context.encode();
2753 if !metrics.contains("runtime_process_rss") {
2754 context.sleep(Duration::from_millis(100)).await;
2755 continue;
2756 }
2757
2758 for line in metrics.lines() {
2760 if line.starts_with("runtime_process_rss")
2761 && !line.starts_with("runtime_process_rss{")
2762 {
2763 let parts: Vec<&str> = line.split_whitespace().collect();
2764 if parts.len() >= 2 {
2765 let rss_value: i64 =
2766 parts[1].parse().expect("Failed to parse RSS value");
2767 if rss_value > 0 {
2768 return;
2769 }
2770 }
2771 }
2772 }
2773 }
2774 });
2775 }
2776
2777 #[test]
2778 fn test_tokio_telemetry() {
2779 let executor = tokio::Runner::default();
2780 executor.start(|context| async move {
2781 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2783
2784 tokio::telemetry::init(
2786 context.with_label("metrics"),
2787 tokio::telemetry::Logging {
2788 level: Level::INFO,
2789 json: false,
2790 },
2791 Some(address),
2792 None,
2793 );
2794
2795 let counter: Counter<u64> = Counter::default();
2797 context.register("test_counter", "Test counter", counter.clone());
2798 counter.inc();
2799
2800 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2802 let mut line = Vec::new();
2803 loop {
2804 let mut byte = [0u8; 1];
2805 stream.recv(&mut byte[..]).await?;
2806 if byte[0] == b'\n' {
2807 if line.last() == Some(&b'\r') {
2808 line.pop(); }
2810 break;
2811 }
2812 line.push(byte[0]);
2813 }
2814 String::from_utf8(line).map_err(|_| Error::ReadFailed)
2815 }
2816
2817 async fn read_headers<St: Stream>(
2818 stream: &mut St,
2819 ) -> Result<HashMap<String, String>, Error> {
2820 let mut headers = HashMap::new();
2821 loop {
2822 let line = read_line(stream).await?;
2823 if line.is_empty() {
2824 break;
2825 }
2826 let parts: Vec<&str> = line.splitn(2, ": ").collect();
2827 if parts.len() == 2 {
2828 headers.insert(parts[0].to_string(), parts[1].to_string());
2829 }
2830 }
2831 Ok(headers)
2832 }
2833
2834 async fn read_body<St: Stream>(
2835 stream: &mut St,
2836 content_length: usize,
2837 ) -> Result<String, Error> {
2838 let mut read = vec![0; content_length];
2839 stream.recv(&mut read[..]).await?;
2840 String::from_utf8(read).map_err(|_| Error::ReadFailed)
2841 }
2842
2843 let client_handle = context
2845 .with_label("client")
2846 .spawn(move |context| async move {
2847 let (mut sink, mut stream) = loop {
2848 match context.dial(address).await {
2849 Ok((sink, stream)) => break (sink, stream),
2850 Err(e) => {
2851 error!(err =?e, "failed to connect");
2853 context.sleep(Duration::from_millis(10)).await;
2854 }
2855 }
2856 };
2857
2858 let request = format!(
2860 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2861 );
2862 sink.send(Bytes::from(request)).await.unwrap();
2863
2864 let status_line = read_line(&mut stream).await.unwrap();
2866 assert_eq!(status_line, "HTTP/1.1 200 OK");
2867
2868 let headers = read_headers(&mut stream).await.unwrap();
2870 println!("Headers: {headers:?}");
2871 let content_length = headers
2872 .get("content-length")
2873 .unwrap()
2874 .parse::<usize>()
2875 .unwrap();
2876
2877 let body = read_body(&mut stream, content_length).await.unwrap();
2879 assert!(body.contains("test_counter_total 1"));
2880 });
2881
2882 client_handle.await.unwrap();
2884 });
2885 }
2886
2887 #[test]
2888 fn test_tokio_resolver() {
2889 let executor = tokio::Runner::default();
2890 executor.start(|context| async move {
2891 let addrs = context.resolve("localhost").await.unwrap();
2892 assert!(!addrs.is_empty());
2893 for addr in addrs {
2894 assert!(
2895 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
2896 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
2897 );
2898 }
2899 });
2900 }
2901
2902 #[test]
2903 fn test_create_pool_tokio() {
2904 let executor = tokio::Runner::default();
2905 executor.start(|context| async move {
2906 let pool = context.with_label("pool").create_pool(NZUsize!(4)).unwrap();
2908
2909 let v: Vec<_> = (0..10000).collect();
2911
2912 pool.install(|| {
2914 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
2915 });
2916 });
2917 }
2918
2919 #[test]
2920 fn test_create_pool_deterministic() {
2921 let executor = deterministic::Runner::default();
2922 executor.start(|context| async move {
2923 let pool = context.with_label("pool").create_pool(NZUsize!(4)).unwrap();
2925
2926 let v: Vec<_> = (0..10000).collect();
2928
2929 pool.install(|| {
2931 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
2932 });
2933 });
2934 }
2935}