1#![doc(
20 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21 html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34 pub mod deterministic;
35 pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38 pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41 mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44 pub mod tokio;
45});
46stability_scope!(BETA {
47 use commonware_macros::select;
48 use commonware_parallel::{Rayon, ThreadPool};
49 use iobuf::PoolError;
50 use prometheus_client::registry::Metric;
51 use rayon::ThreadPoolBuildError;
52 use std::{
53 future::Future,
54 io::Error as IoError,
55 net::SocketAddr,
56 num::NonZeroUsize,
57 time::{Duration, SystemTime},
58 };
59 use thiserror::Error;
60
61 pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64 pub use bytes::{Buf, BufMut};
66 pub use governor::Quota;
68
69 pub mod iobuf;
70 pub use iobuf::{
71 BufferPool, BufferPoolConfig, BufferPoolThreadCache, Builder as IoBufsBuilder, IoBuf,
72 IoBufMut, IoBufs, IoBufsMut,
73 };
74
75 pub mod utils;
76 pub use utils::*;
77
78 pub mod telemetry;
79
80 pub const DEFAULT_BLOB_VERSION: u16 = 0;
82
83 #[derive(Error, Debug)]
85 pub enum Error {
86 #[error("exited")]
87 Exited,
88 #[error("closed")]
89 Closed,
90 #[error("timeout")]
91 Timeout,
92 #[error("bind failed")]
93 BindFailed,
94 #[error("connection failed")]
95 ConnectionFailed,
96 #[error("write failed")]
97 WriteFailed,
98 #[error("read failed")]
99 ReadFailed,
100 #[error("send failed")]
101 SendFailed,
102 #[error("recv failed")]
103 RecvFailed,
104 #[error("dns resolution failed: {0}")]
105 ResolveFailed(String),
106 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
107 PartitionNameInvalid(String),
108 #[error("partition creation failed: {0}")]
109 PartitionCreationFailed(String),
110 #[error("partition missing: {0}")]
111 PartitionMissing(String),
112 #[error("partition corrupt: {0}")]
113 PartitionCorrupt(String),
114 #[error("blob open failed: {0}/{1} error: {2}")]
115 BlobOpenFailed(String, String, IoError),
116 #[error("blob missing: {0}/{1}")]
117 BlobMissing(String, String),
118 #[error("blob resize failed: {0}/{1} error: {2}")]
119 BlobResizeFailed(String, String, IoError),
120 #[error("blob sync failed: {0}/{1} error: {2}")]
121 BlobSyncFailed(String, String, IoError),
122 #[error("blob insufficient length")]
123 BlobInsufficientLength,
124 #[error("blob corrupt: {0}/{1} reason: {2}")]
125 BlobCorrupt(String, String, String),
126 #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
127 BlobVersionMismatch {
128 expected: std::ops::RangeInclusive<u16>,
129 found: u16,
130 },
131 #[error("invalid or missing checksum")]
132 InvalidChecksum,
133 #[error("offset overflow")]
134 OffsetOverflow,
135 #[error("io error: {0}")]
136 Io(#[from] IoError),
137 #[error("buffer pool: {0}")]
138 Pool(#[from] PoolError),
139 }
140
141 pub trait Runner {
144 type Context;
150
151 fn start<F, Fut>(self, f: F) -> Fut::Output
157 where
158 F: FnOnce(Self::Context) -> Fut,
159 Fut: Future;
160 }
161
162 pub trait Spawner: Clone + Send + Sync + 'static {
164 fn shared(self, blocking: bool) -> Self;
173
174 fn dedicated(self) -> Self;
181
182 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
216 where
217 F: FnOnce(Self) -> Fut + Send + 'static,
218 Fut: Future<Output = T> + Send + 'static,
219 T: Send + 'static;
220
221 fn stop(
241 self,
242 value: i32,
243 timeout: Option<Duration>,
244 ) -> impl Future<Output = Result<(), Error>> + Send;
245
246 fn stopped(&self) -> signal::Signal;
253 }
254
255 pub trait ThreadPooler: Spawner + Metrics {
258 fn create_thread_pool(
267 &self,
268 concurrency: NonZeroUsize,
269 ) -> Result<ThreadPool, ThreadPoolBuildError>;
270
271 fn create_strategy(
280 &self,
281 concurrency: NonZeroUsize,
282 ) -> Result<Rayon, ThreadPoolBuildError> {
283 self.create_thread_pool(concurrency).map(Rayon::with_pool)
284 }
285 }
286
287 pub trait Metrics: Clone + Send + Sync + 'static {
321 fn label(&self) -> String;
323
324 fn with_label(&self, label: &str) -> Self;
332
333 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
420
421 fn with_scope(&self) -> Self;
452
453 fn with_span(&self) -> Self;
466
467 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
473
474 fn encode(&self) -> String;
481 }
482
483 pub type RateLimiter<C> = governor::RateLimiter<
488 governor::state::NotKeyed,
489 governor::state::InMemoryState,
490 C,
491 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
492 >;
493
494 pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
501 K,
502 governor::state::keyed::HashMapStateStore<K>,
503 C,
504 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
505 >;
506
507 pub trait Clock:
513 governor::clock::Clock<Instant = SystemTime>
514 + governor::clock::ReasonablyRealtime
515 + Clone
516 + Send
517 + Sync
518 + 'static
519 {
520 fn current(&self) -> SystemTime;
522
523 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
525
526 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
528
529 fn timeout<F, T>(
550 &self,
551 duration: Duration,
552 future: F,
553 ) -> impl Future<Output = Result<T, Error>> + Send + '_
554 where
555 F: Future<Output = T> + Send + 'static,
556 T: Send + 'static,
557 {
558 async move {
559 select! {
560 result = future => Ok(result),
561 _ = self.sleep(duration) => Err(Error::Timeout),
562 }
563 }
564 }
565 }
566
567 pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
569
570 pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
572
573 pub type ListenerOf<N> = <N as crate::Network>::Listener;
575
576 pub trait Network: Clone + Send + Sync + 'static {
579 type Listener: Listener;
583
584 fn bind(
586 &self,
587 socket: SocketAddr,
588 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
589
590 fn dial(
592 &self,
593 socket: SocketAddr,
594 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
595 }
596
597 pub trait Resolver: Clone + Send + Sync + 'static {
599 fn resolve(
603 &self,
604 host: &str,
605 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
606 }
607
608 pub trait Listener: Sync + Send + 'static {
611 type Sink: Sink;
614 type Stream: Stream;
617
618 fn accept(
620 &mut self,
621 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
622
623 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
625 }
626
627 pub trait Sink: Sync + Send + 'static {
630 fn send(
636 &mut self,
637 bufs: impl Into<IoBufs> + Send,
638 ) -> impl Future<Output = Result<(), Error>> + Send;
639 }
640
641 pub trait Stream: Sync + Send + 'static {
644 fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
652
653 fn peek(&self, max_len: usize) -> &[u8];
661 }
662
663 pub trait Storage: Clone + Send + Sync + 'static {
676 type Blob: Blob;
678
679 fn open(
682 &self,
683 partition: &str,
684 name: &[u8],
685 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
686 async move {
687 let (blob, size, _) = self
688 .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
689 .await?;
690 Ok((blob, size))
691 }
692 }
693
694 fn open_versioned(
711 &self,
712 partition: &str,
713 name: &[u8],
714 versions: std::ops::RangeInclusive<u16>,
715 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
716
717 fn remove(
723 &self,
724 partition: &str,
725 name: Option<&[u8]>,
726 ) -> impl Future<Output = Result<(), Error>> + Send;
727
728 fn scan(&self, partition: &str)
730 -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
731 }
732
733 #[allow(clippy::len_without_is_empty)]
748 pub trait Blob: Clone + Send + Sync + 'static {
749 fn read_at_buf(
765 &self,
766 offset: u64,
767 len: usize,
768 bufs: impl Into<IoBufsMut> + Send,
769 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
770
771 fn read_at(
776 &self,
777 offset: u64,
778 len: usize,
779 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
780
781 fn write_at(
783 &self,
784 offset: u64,
785 bufs: impl Into<IoBufs> + Send,
786 ) -> impl Future<Output = Result<(), Error>> + Send;
787
788 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
793
794 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
796 }
797
798 pub trait BufferPooler: Clone + Send + Sync + 'static {
800 fn network_buffer_pool(&self) -> &BufferPool;
802
803 fn storage_buffer_pool(&self) -> &BufferPool;
805 }
806});
807stability_scope!(BETA, cfg(feature = "external") {
808 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
810 fn pace<'a, F, T>(
830 &'a self,
831 latency: Duration,
832 future: F,
833 ) -> impl Future<Output = T> + Send + 'a
834 where
835 F: Future<Output = T> + Send + 'a,
836 T: Send + 'a;
837 }
838
839 pub trait FutureExt: Future + Send + Sized {
844 fn pace<'a, E>(
846 self,
847 pacer: &'a E,
848 latency: Duration,
849 ) -> impl Future<Output = Self::Output> + Send + 'a
850 where
851 E: Pacer + 'a,
852 Self: Send + 'a,
853 Self::Output: Send + 'a,
854 {
855 pacer.pace(latency, self)
856 }
857 }
858
859 impl<F> FutureExt for F where F: Future + Send {}
860});
861
862#[cfg(test)]
863mod tests {
864 use super::*;
865 use crate::telemetry::traces::collector::TraceStorage;
866 use bytes::Bytes;
867 use commonware_macros::{select, test_collect_traces};
868 use commonware_utils::{
869 channel::{mpsc, oneshot},
870 sync::Mutex,
871 NZUsize, SystemTimeExt,
872 };
873 use futures::{
874 future::{pending, ready},
875 join, pin_mut, FutureExt,
876 };
877 use prometheus_client::{
878 encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue},
879 metrics::{counter::Counter, family::Family},
880 };
881 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
882 use std::{
883 collections::HashMap,
884 net::{IpAddr, Ipv4Addr, Ipv6Addr},
885 pin::Pin,
886 str::FromStr,
887 sync::{
888 atomic::{AtomicU32, Ordering},
889 Arc,
890 },
891 task::{Context as TContext, Poll, Waker},
892 };
893 use tracing::{error, Level};
894 use utils::reschedule;
895
896 fn test_error_future<R: Runner>(runner: R) {
897 #[allow(clippy::unused_async)]
898 async fn error_future() -> Result<&'static str, &'static str> {
899 Err("An error occurred")
900 }
901 let result = runner.start(|_| error_future());
902 assert_eq!(result, Err("An error occurred"));
903 }
904
905 fn test_clock_sleep<R: Runner>(runner: R)
906 where
907 R::Context: Spawner + Clock,
908 {
909 runner.start(|context| async move {
910 let start = context.current();
912 let sleep_duration = Duration::from_millis(10);
913 context.sleep(sleep_duration).await;
914
915 let end = context.current();
917 assert!(end.duration_since(start).unwrap() >= sleep_duration);
918 });
919 }
920
921 fn test_clock_sleep_until<R: Runner>(runner: R)
922 where
923 R::Context: Spawner + Clock + Metrics,
924 {
925 runner.start(|context| async move {
926 let now = context.current();
928 context.sleep_until(now + Duration::from_millis(100)).await;
929
930 let elapsed = now.elapsed().unwrap();
932 assert!(elapsed >= Duration::from_millis(100));
933 });
934 }
935
936 fn test_clock_sleep_until_far_future<R: Runner>(runner: R)
937 where
938 R::Context: Spawner + Clock,
939 {
940 runner.start(|context| async move {
941 let sleep = context.sleep_until(SystemTime::limit());
942 let result = context.timeout(Duration::from_millis(1), sleep).await;
943 assert!(matches!(result, Err(Error::Timeout)));
944 });
945 }
946
947 fn test_clock_timeout<R: Runner>(runner: R)
948 where
949 R::Context: Spawner + Clock,
950 {
951 runner.start(|context| async move {
952 let result = context
954 .timeout(Duration::from_millis(100), async { "success" })
955 .await;
956 assert_eq!(result.unwrap(), "success");
957
958 let result = context
960 .timeout(Duration::from_millis(50), pending::<()>())
961 .await;
962 assert!(matches!(result, Err(Error::Timeout)));
963
964 let result = context
966 .timeout(
967 Duration::from_millis(100),
968 context.sleep(Duration::from_millis(50)),
969 )
970 .await;
971 assert!(result.is_ok());
972 });
973 }
974
975 fn test_root_finishes<R: Runner>(runner: R)
976 where
977 R::Context: Spawner,
978 {
979 runner.start(|context| async move {
980 context.spawn(|_| async move {
981 loop {
982 reschedule().await;
983 }
984 });
985 });
986 }
987
988 fn test_spawn_after_abort<R>(runner: R)
989 where
990 R: Runner,
991 R::Context: Spawner + Clone,
992 {
993 runner.start(|context| async move {
994 let child = context.clone();
996
997 let parent_handle = context.spawn(move |_| async move {
999 pending::<()>().await;
1000 });
1001 parent_handle.abort();
1002
1003 let child_handle = child.spawn(move |_| async move {
1005 pending::<()>().await;
1006 });
1007 assert!(matches!(child_handle.await, Err(Error::Closed)));
1008 });
1009 }
1010
1011 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
1012 where
1013 R::Context: Spawner,
1014 {
1015 runner.start(|context| async move {
1016 let context = if dedicated {
1017 assert!(!blocking);
1018 context.dedicated()
1019 } else {
1020 context.shared(blocking)
1021 };
1022
1023 let handle = context.spawn(|_| async move {
1024 loop {
1025 reschedule().await;
1026 }
1027 });
1028 handle.abort();
1029 assert!(matches!(handle.await, Err(Error::Closed)));
1030 });
1031 }
1032
1033 fn test_panic_aborts_root<R: Runner>(runner: R) {
1034 let result: Result<(), Error> = runner.start(|_| async move {
1035 panic!("blah");
1036 });
1037 result.unwrap_err();
1038 }
1039
1040 fn test_panic_aborts_spawn<R: Runner>(runner: R)
1041 where
1042 R::Context: Spawner + Clock,
1043 {
1044 runner.start(|context| async move {
1045 context.clone().spawn(|_| async move {
1046 panic!("blah");
1047 });
1048
1049 loop {
1051 context.sleep(Duration::from_millis(100)).await;
1052 }
1053 });
1054 }
1055
1056 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1057 where
1058 R::Context: Spawner + Clock,
1059 {
1060 let result: Result<(), Error> = runner.start(|context| async move {
1061 let result = context.clone().spawn(|_| async move {
1062 panic!("blah");
1063 });
1064 result.await
1065 });
1066 assert!(matches!(result, Err(Error::Exited)));
1067 }
1068
1069 fn test_multiple_panics<R: Runner>(runner: R)
1070 where
1071 R::Context: Spawner + Clock,
1072 {
1073 runner.start(|context| async move {
1074 context.clone().spawn(|_| async move {
1075 panic!("boom 1");
1076 });
1077 context.clone().spawn(|_| async move {
1078 panic!("boom 2");
1079 });
1080 context.clone().spawn(|_| async move {
1081 panic!("boom 3");
1082 });
1083
1084 loop {
1086 context.sleep(Duration::from_millis(100)).await;
1087 }
1088 });
1089 }
1090
1091 fn test_multiple_panics_caught<R: Runner>(runner: R)
1092 where
1093 R::Context: Spawner + Clock,
1094 {
1095 let (res1, res2, res3) = runner.start(|context| async move {
1096 let handle1 = context.clone().spawn(|_| async move {
1097 panic!("boom 1");
1098 });
1099 let handle2 = context.clone().spawn(|_| async move {
1100 panic!("boom 2");
1101 });
1102 let handle3 = context.clone().spawn(|_| async move {
1103 panic!("boom 3");
1104 });
1105
1106 join!(handle1, handle2, handle3)
1107 });
1108 assert!(matches!(res1, Err(Error::Exited)));
1109 assert!(matches!(res2, Err(Error::Exited)));
1110 assert!(matches!(res3, Err(Error::Exited)));
1111 }
1112
1113 fn test_select<R: Runner>(runner: R) {
1114 runner.start(|_| async move {
1115 let output = Mutex::new(0);
1117 select! {
1118 v1 = ready(1) => {
1119 *output.lock() = v1;
1120 },
1121 v2 = ready(2) => {
1122 *output.lock() = v2;
1123 },
1124 };
1125 assert_eq!(*output.lock(), 1);
1126
1127 select! {
1129 v1 = std::future::pending::<i32>() => {
1130 *output.lock() = v1;
1131 },
1132 v2 = ready(2) => {
1133 *output.lock() = v2;
1134 },
1135 };
1136 assert_eq!(*output.lock(), 2);
1137 });
1138 }
1139
1140 fn test_select_loop<R: Runner>(runner: R)
1142 where
1143 R::Context: Clock,
1144 {
1145 runner.start(|context| async move {
1146 let (sender, mut receiver) = mpsc::unbounded_channel();
1148 for _ in 0..2 {
1149 select! {
1150 v = receiver.recv() => {
1151 panic!("unexpected value: {v:?}");
1152 },
1153 _ = context.sleep(Duration::from_millis(100)) => {
1154 continue;
1155 },
1156 };
1157 }
1158
1159 sender.send(0).unwrap();
1161 sender.send(1).unwrap();
1162
1163 select! {
1165 _ = async {} => {
1166 },
1168 v = receiver.recv() => {
1169 panic!("unexpected value: {v:?}");
1170 },
1171 };
1172
1173 for i in 0..2 {
1175 select! {
1176 _ = context.sleep(Duration::from_millis(100)) => {
1177 panic!("timeout");
1178 },
1179 v = receiver.recv() => {
1180 assert_eq!(v.unwrap(), i);
1181 },
1182 };
1183 }
1184 });
1185 }
1186
1187 fn test_storage_operations<R: Runner>(runner: R)
1188 where
1189 R::Context: Storage,
1190 {
1191 runner.start(|context| async move {
1192 let partition = "test_partition";
1193 let name = b"test_blob";
1194
1195 let (blob, size) = context
1197 .open(partition, name)
1198 .await
1199 .expect("Failed to open blob");
1200 assert_eq!(size, 0, "new blob should have size 0");
1201
1202 let data = b"Hello, Storage!";
1204 blob.write_at(0, data)
1205 .await
1206 .expect("Failed to write to blob");
1207
1208 blob.sync().await.expect("Failed to sync blob");
1210
1211 let read = blob
1213 .read_at(0, data.len())
1214 .await
1215 .expect("Failed to read from blob");
1216 assert_eq!(read.coalesce(), data);
1217
1218 blob.sync().await.expect("Failed to sync blob");
1220
1221 let blobs = context
1223 .scan(partition)
1224 .await
1225 .expect("Failed to scan partition");
1226 assert!(blobs.contains(&name.to_vec()));
1227
1228 let (blob, len) = context
1230 .open(partition, name)
1231 .await
1232 .expect("Failed to reopen blob");
1233 assert_eq!(len, data.len() as u64);
1234
1235 let read = blob.read_at(7, 7).await.expect("Failed to read data");
1237 assert_eq!(read.coalesce(), b"Storage");
1238
1239 blob.sync().await.expect("Failed to sync blob");
1241
1242 context
1244 .remove(partition, Some(name))
1245 .await
1246 .expect("Failed to remove blob");
1247
1248 let blobs = context
1250 .scan(partition)
1251 .await
1252 .expect("Failed to scan partition");
1253 assert!(!blobs.contains(&name.to_vec()));
1254
1255 context
1257 .remove(partition, None)
1258 .await
1259 .expect("Failed to remove partition");
1260
1261 let result = context.scan(partition).await;
1263 assert!(matches!(result, Err(Error::PartitionMissing(_))));
1264 });
1265 }
1266
1267 fn test_blob_read_write<R: Runner>(runner: R)
1268 where
1269 R::Context: Storage,
1270 {
1271 runner.start(|context| async move {
1272 let partition = "test_partition";
1273 let name = b"test_blob_rw";
1274
1275 let (blob, _) = context
1277 .open(partition, name)
1278 .await
1279 .expect("Failed to open blob");
1280
1281 let data1 = b"Hello";
1283 let data2 = b"World";
1284 blob.write_at(0, data1)
1285 .await
1286 .expect("Failed to write data1");
1287 blob.write_at(5, data2)
1288 .await
1289 .expect("Failed to write data2");
1290
1291 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1293 let read = read.coalesce();
1294 assert_eq!(&read.as_ref()[..5], data1);
1295 assert_eq!(&read.as_ref()[5..], data2);
1296
1297 let result = blob.read_at(10, 10).await;
1299 assert!(result.is_err());
1300
1301 let data3 = b"Store";
1303 blob.write_at(5, data3)
1304 .await
1305 .expect("Failed to write data3");
1306
1307 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1309 let read = read.coalesce();
1310 assert_eq!(&read.as_ref()[..5], data1);
1311 assert_eq!(&read.as_ref()[5..], data3);
1312
1313 let result = blob.read_at(10, 10).await;
1315 assert!(result.is_err());
1316 });
1317 }
1318
1319 fn test_blob_resize<R: Runner>(runner: R)
1320 where
1321 R::Context: Storage,
1322 {
1323 runner.start(|context| async move {
1324 let partition = "test_partition_resize";
1325 let name = b"test_blob_resize";
1326
1327 let (blob, _) = context
1329 .open(partition, name)
1330 .await
1331 .expect("Failed to open blob");
1332
1333 let data = b"some data";
1334 blob.write_at(0, data.to_vec())
1335 .await
1336 .expect("Failed to write");
1337 blob.sync().await.expect("Failed to sync after write");
1338
1339 let (blob, len) = context.open(partition, name).await.unwrap();
1341 assert_eq!(len, data.len() as u64);
1342
1343 let new_len = (data.len() as u64) * 2;
1345 blob.resize(new_len)
1346 .await
1347 .expect("Failed to resize to extend");
1348 blob.sync().await.expect("Failed to sync after resize");
1349
1350 let (blob, len) = context.open(partition, name).await.unwrap();
1352 assert_eq!(len, new_len);
1353
1354 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1356 assert_eq!(read_buf.coalesce(), data);
1357
1358 let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1360 assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1361
1362 blob.resize(data.len() as u64).await.unwrap();
1364 blob.sync().await.unwrap();
1365
1366 let (blob, size) = context.open(partition, name).await.unwrap();
1368 assert_eq!(size, data.len() as u64);
1369
1370 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1372 assert_eq!(read_buf.coalesce(), data);
1373 blob.sync().await.unwrap();
1374 });
1375 }
1376
1377 fn test_many_partition_read_write<R: Runner>(runner: R)
1378 where
1379 R::Context: Storage,
1380 {
1381 runner.start(|context| async move {
1382 let partitions = ["partition1", "partition2", "partition3"];
1383 let name = b"test_blob_rw";
1384 let data1 = b"Hello";
1385 let data2 = b"World";
1386
1387 for (additional, partition) in partitions.iter().enumerate() {
1388 let (blob, _) = context
1390 .open(partition, name)
1391 .await
1392 .expect("Failed to open blob");
1393
1394 blob.write_at(0, data1)
1396 .await
1397 .expect("Failed to write data1");
1398 blob.write_at(5 + additional as u64, data2)
1399 .await
1400 .expect("Failed to write data2");
1401
1402 blob.sync().await.expect("Failed to sync blob");
1404 }
1405
1406 for (additional, partition) in partitions.iter().enumerate() {
1407 let (blob, len) = context
1409 .open(partition, name)
1410 .await
1411 .expect("Failed to open blob");
1412 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1413
1414 let read = blob
1416 .read_at(0, 10 + additional)
1417 .await
1418 .expect("Failed to read data");
1419 let read = read.coalesce();
1420 assert_eq!(&read.as_ref()[..5], b"Hello");
1421 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1422 }
1423 });
1424 }
1425
1426 fn test_blob_read_past_length<R: Runner>(runner: R)
1427 where
1428 R::Context: Storage,
1429 {
1430 runner.start(|context| async move {
1431 let partition = "test_partition";
1432 let name = b"test_blob_rw";
1433
1434 let (blob, _) = context
1436 .open(partition, name)
1437 .await
1438 .expect("Failed to open blob");
1439
1440 let result = blob.read_at(0, 10).await;
1442 assert!(result.is_err());
1443
1444 let data = b"Hello, Storage!".to_vec();
1446 blob.write_at(0, data)
1447 .await
1448 .expect("Failed to write to blob");
1449
1450 let result = blob.read_at(0, 20).await;
1452 assert!(result.is_err());
1453 })
1454 }
1455
1456 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1457 where
1458 R::Context: Spawner + Storage + Metrics,
1459 {
1460 runner.start(|context| async move {
1461 let partition = "test_partition";
1462 let name = b"test_blob_rw";
1463
1464 let (blob, _) = context
1466 .open(partition, name)
1467 .await
1468 .expect("Failed to open blob");
1469
1470 let data = b"Hello, Storage!";
1472 blob.write_at(0, data)
1473 .await
1474 .expect("Failed to write to blob");
1475
1476 blob.sync().await.expect("Failed to sync blob");
1478
1479 let check1 = context.with_label("check1").spawn({
1481 let blob = blob.clone();
1482 let data_len = data.len();
1483 move |_| async move {
1484 let read = blob
1485 .read_at(0, data_len)
1486 .await
1487 .expect("Failed to read from blob");
1488 assert_eq!(read.coalesce(), data);
1489 }
1490 });
1491 let check2 = context.with_label("check2").spawn({
1492 let blob = blob.clone();
1493 let data_len = data.len();
1494 move |_| async move {
1495 let read = blob
1496 .read_at(0, data_len)
1497 .await
1498 .expect("Failed to read from blob");
1499 assert_eq!(read.coalesce(), data);
1500 }
1501 });
1502
1503 let result = join!(check1, check2);
1505 assert!(result.0.is_ok());
1506 assert!(result.1.is_ok());
1507
1508 let read = blob
1510 .read_at(0, data.len())
1511 .await
1512 .expect("Failed to read from blob");
1513 assert_eq!(read.coalesce(), data);
1514
1515 drop(blob);
1517
1518 let buffer = context.encode();
1520 assert!(buffer.contains("open_blobs 0"));
1521 });
1522 }
1523
1524 fn test_shutdown<R: Runner>(runner: R)
1525 where
1526 R::Context: Spawner + Metrics + Clock,
1527 {
1528 let kill = 9;
1529 runner.start(|context| async move {
1530 let before = context
1532 .with_label("before")
1533 .spawn(move |context| async move {
1534 let mut signal = context.stopped();
1535 let value = (&mut signal).await.unwrap();
1536 assert_eq!(value, kill);
1537 drop(signal);
1538 });
1539
1540 let result = context.clone().stop(kill, None).await;
1542 assert!(result.is_ok());
1543
1544 let after = context
1546 .with_label("after")
1547 .spawn(move |context| async move {
1548 let value = context.stopped().await.unwrap();
1550 assert_eq!(value, kill);
1551 });
1552
1553 let result = join!(before, after);
1555 assert!(result.0.is_ok());
1556 assert!(result.1.is_ok());
1557 });
1558 }
1559
1560 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1561 where
1562 R::Context: Spawner + Metrics + Clock,
1563 {
1564 let kill = 42;
1565 runner.start(|context| async move {
1566 let (started_tx, mut started_rx) = mpsc::channel(3);
1567 let counter = Arc::new(AtomicU32::new(0));
1568
1569 let task = |cleanup_duration: Duration| {
1572 let context = context.clone();
1573 let counter = counter.clone();
1574 let started_tx = started_tx.clone();
1575 context.spawn(move |context| async move {
1576 let mut signal = context.stopped();
1578 started_tx.send(()).await.unwrap();
1579
1580 let value = (&mut signal).await.unwrap();
1582 assert_eq!(value, kill);
1583 context.sleep(cleanup_duration).await;
1584 counter.fetch_add(1, Ordering::SeqCst);
1585
1586 drop(signal);
1588 })
1589 };
1590
1591 let task1 = task(Duration::from_millis(10));
1592 let task2 = task(Duration::from_millis(20));
1593 let task3 = task(Duration::from_millis(30));
1594
1595 for _ in 0..3 {
1597 started_rx.recv().await.unwrap();
1598 }
1599
1600 context.stop(kill, None).await.unwrap();
1602 assert_eq!(counter.load(Ordering::SeqCst), 3);
1603
1604 let result = join!(task1, task2, task3);
1606 assert!(result.0.is_ok());
1607 assert!(result.1.is_ok());
1608 assert!(result.2.is_ok());
1609 });
1610 }
1611
1612 fn test_shutdown_timeout<R: Runner>(runner: R)
1613 where
1614 R::Context: Spawner + Metrics + Clock,
1615 {
1616 let kill = 42;
1617 runner.start(|context| async move {
1618 let (started_tx, started_rx) = oneshot::channel();
1620
1621 context.clone().spawn(move |context| async move {
1623 let signal = context.stopped();
1624 started_tx.send(()).unwrap();
1625 pending::<()>().await;
1626 signal.await.unwrap();
1627 });
1628
1629 started_rx.await.unwrap();
1631 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1632
1633 assert!(matches!(result, Err(Error::Timeout)));
1635 });
1636 }
1637
1638 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1639 where
1640 R::Context: Spawner + Metrics + Clock,
1641 {
1642 let kill1 = 42;
1643 let kill2 = 43;
1644
1645 runner.start(|context| async move {
1646 let (started_tx, started_rx) = oneshot::channel();
1647 let counter = Arc::new(AtomicU32::new(0));
1648
1649 let task = context.with_label("blocking_task").spawn({
1651 let counter = counter.clone();
1652 move |context| async move {
1653 let mut signal = context.stopped();
1655 started_tx.send(()).unwrap();
1656
1657 let value = (&mut signal).await.unwrap();
1659 assert_eq!(value, kill1);
1660 context.sleep(Duration::from_millis(50)).await;
1661
1662 counter.fetch_add(1, Ordering::SeqCst);
1664 drop(signal);
1665 }
1666 });
1667
1668 started_rx.await.unwrap();
1670
1671 let stop_task1 = context.clone().stop(kill1, None);
1674 pin_mut!(stop_task1);
1675 let stop_task2 = context.clone().stop(kill2, None);
1676 pin_mut!(stop_task2);
1677
1678 assert!(stop_task1.as_mut().now_or_never().is_none());
1680 assert!(stop_task2.as_mut().now_or_never().is_none());
1681
1682 assert!(stop_task1.await.is_ok());
1684 assert!(stop_task2.await.is_ok());
1685
1686 let sig = context.stopped().await;
1688 assert_eq!(sig.unwrap(), kill1);
1689
1690 let result = task.await;
1692 assert!(result.is_ok());
1693 assert_eq!(counter.load(Ordering::SeqCst), 1);
1694
1695 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1697 });
1698 }
1699
1700 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1701 where
1702 R::Context: Spawner + Metrics,
1703 {
1704 runner.start(|context| async move {
1705 context
1707 .with_label("before")
1708 .spawn(move |context| async move {
1709 let mut signal = context.stopped();
1710 let value = (&mut signal).await.unwrap();
1711
1712 assert_eq!(value, 42);
1714 drop(signal);
1715 });
1716
1717 reschedule().await;
1719 });
1720 }
1721
1722 fn test_spawn_dedicated<R: Runner>(runner: R)
1723 where
1724 R::Context: Spawner,
1725 {
1726 runner.start(|context| async move {
1727 let handle = context.dedicated().spawn(|_| async move { 42 });
1728 assert!(matches!(handle.await, Ok(42)));
1729 });
1730 }
1731
1732 fn test_spawn<R: Runner>(runner: R)
1733 where
1734 R::Context: Spawner + Clock,
1735 {
1736 runner.start(|context| async move {
1737 let child_handle = Arc::new(Mutex::new(None));
1738 let child_handle2 = child_handle.clone();
1739
1740 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1741 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1742 let parent_handle = context.spawn(move |context| async move {
1743 let handle = context.spawn(|_| async {});
1745
1746 *child_handle2.lock() = Some(handle);
1748
1749 parent_initialized_tx.send(()).unwrap();
1750
1751 parent_complete_rx.await.unwrap();
1753 });
1754
1755 parent_initialized_rx.await.unwrap();
1757
1758 let child_handle = child_handle.lock().take().unwrap();
1760 assert!(child_handle.await.is_ok());
1761
1762 parent_complete_tx.send(()).unwrap();
1764
1765 assert!(parent_handle.await.is_ok());
1767 });
1768 }
1769
1770 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1771 where
1772 R::Context: Spawner + Clock,
1773 {
1774 runner.start(|context| async move {
1775 let child_handle = Arc::new(Mutex::new(None));
1776 let child_handle2 = child_handle.clone();
1777
1778 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1779 let parent_handle = context.spawn(move |context| async move {
1780 let handle = context.spawn(|_| pending::<()>());
1782
1783 *child_handle2.lock() = Some(handle);
1785
1786 parent_initialized_tx.send(()).unwrap();
1787
1788 pending::<()>().await
1790 });
1791
1792 parent_initialized_rx.await.unwrap();
1794
1795 parent_handle.abort();
1797 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1798
1799 let child_handle = child_handle.lock().take().unwrap();
1801 assert!(matches!(child_handle.await, Err(Error::Closed)));
1802 });
1803 }
1804
1805 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1806 where
1807 R::Context: Spawner + Clock,
1808 {
1809 runner.start(|context| async move {
1810 let child_handle = Arc::new(Mutex::new(None));
1811 let child_handle2 = child_handle.clone();
1812
1813 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1814 let parent_handle = context.spawn(move |context| async move {
1815 let handle = context.spawn(|_| pending::<()>());
1817
1818 *child_handle2.lock() = Some(handle);
1820
1821 parent_complete_rx.await.unwrap();
1823 });
1824
1825 parent_complete_tx.send(()).unwrap();
1827
1828 assert!(parent_handle.await.is_ok());
1830
1831 let child_handle = child_handle.lock().take().unwrap();
1833 assert!(matches!(child_handle.await, Err(Error::Closed)));
1834 });
1835 }
1836
1837 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1838 where
1839 R::Context: Spawner + Clock,
1840 {
1841 runner.start(|context| async move {
1842 let c0 = context.clone();
1852 let g0 = c0.clone();
1853 let g1 = c0.clone();
1854 let c1 = context.clone();
1855 let g2 = c1.clone();
1856 let g3 = c1.clone();
1857 let c2 = context.clone();
1858 let g4 = c2.clone();
1859 let g5 = c2.clone();
1860
1861 let handles = Arc::new(Mutex::new(Vec::new()));
1863 let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1864 let root_task = context.spawn({
1865 let handles = handles.clone();
1866 move |_| async move {
1867 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1868 {
1869 let handle = context.spawn({
1870 let handles = handles.clone();
1871 let initialized_tx = initialized_tx.clone();
1872 move |_| async move {
1873 for grandchild in grandchildren {
1874 let handle = grandchild.spawn(|_| async {
1875 pending::<()>().await;
1876 });
1877 handles.lock().push(handle);
1878 initialized_tx.send(()).await.unwrap();
1879 }
1880
1881 pending::<()>().await;
1882 }
1883 });
1884 handles.lock().push(handle);
1885 initialized_tx.send(()).await.unwrap();
1886 }
1887
1888 pending::<()>().await;
1889 }
1890 });
1891
1892 for _ in 0..9 {
1894 initialized_rx.recv().await.unwrap();
1895 }
1896
1897 assert_eq!(handles.lock().len(), 9);
1899
1900 root_task.abort();
1902 assert!(matches!(root_task.await, Err(Error::Closed)));
1903
1904 let handles = handles.lock().drain(..).collect::<Vec<_>>();
1906 for handle in handles {
1907 assert!(matches!(handle.await, Err(Error::Closed)));
1908 }
1909 });
1910 }
1911
1912 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1913 where
1914 R::Context: Spawner + Clock,
1915 {
1916 runner.start(|context| async move {
1917 let (child_started_tx, child_started_rx) = oneshot::channel();
1918 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1919 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1920 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1921 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1922 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1923 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1924
1925 let parent = context.spawn(move |context| async move {
1926 let child_handle = context.clone().spawn(|_| async move {
1928 child_started_tx.send(()).unwrap();
1929 child_complete_rx.await.unwrap();
1931 });
1932 assert!(
1933 child_handle_tx.send(child_handle).is_ok(),
1934 "child handle receiver dropped"
1935 );
1936
1937 let sibling_handle = context.clone().spawn(move |_| async move {
1939 sibling_started_tx.send(()).unwrap();
1940 sibling_complete_rx.await.unwrap();
1942 });
1943 assert!(
1944 sibling_handle_tx.send(sibling_handle).is_ok(),
1945 "sibling handle receiver dropped"
1946 );
1947
1948 parent_complete_rx.await.unwrap();
1950 });
1951
1952 child_started_rx.await.unwrap();
1954 sibling_started_rx.await.unwrap();
1955
1956 sibling_complete_tx.send(()).unwrap();
1958 assert!(sibling_handle_rx.await.is_ok());
1959
1960 child_complete_tx.send(()).unwrap();
1962 assert!(child_handle_rx.await.is_ok());
1963
1964 parent_complete_tx.send(()).unwrap();
1966 assert!(parent.await.is_ok());
1967 });
1968 }
1969
1970 fn test_spawn_clone_chain<R: Runner>(runner: R)
1971 where
1972 R::Context: Spawner + Clock,
1973 {
1974 runner.start(|context| async move {
1975 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1976 let (child_started_tx, child_started_rx) = oneshot::channel();
1977 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1978 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1979 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1980
1981 let parent = context.clone().spawn({
1982 move |context| async move {
1983 let child = context.clone().spawn({
1984 move |context| async move {
1985 let grandchild = context.clone().spawn({
1986 move |_| async move {
1987 grandchild_started_tx.send(()).unwrap();
1988 pending::<()>().await;
1989 }
1990 });
1991 assert!(
1992 grandchild_handle_tx.send(grandchild).is_ok(),
1993 "grandchild handle receiver dropped"
1994 );
1995 child_started_tx.send(()).unwrap();
1996 pending::<()>().await;
1997 }
1998 });
1999 assert!(
2000 child_handle_tx.send(child).is_ok(),
2001 "child handle receiver dropped"
2002 );
2003 parent_started_tx.send(()).unwrap();
2004 pending::<()>().await;
2005 }
2006 });
2007
2008 parent_started_rx.await.unwrap();
2009 child_started_rx.await.unwrap();
2010 grandchild_started_rx.await.unwrap();
2011
2012 let child_handle = child_handle_rx.await.unwrap();
2013 let grandchild_handle = grandchild_handle_rx.await.unwrap();
2014
2015 parent.abort();
2016 assert!(parent.await.is_err());
2017
2018 assert!(child_handle.await.is_err());
2019 assert!(grandchild_handle.await.is_err());
2020 });
2021 }
2022
2023 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
2024 where
2025 R::Context: Spawner + Clock,
2026 {
2027 runner.start(|context| async move {
2028 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
2029 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
2030
2031 let parent = context.clone().spawn({
2032 move |context| async move {
2033 let clone1 = context.clone();
2034 let clone2 = clone1.clone();
2035 let clone3 = clone2.clone();
2036
2037 let leaf = clone3.clone().spawn({
2038 move |_| async move {
2039 leaf_started_tx.send(()).unwrap();
2040 pending::<()>().await;
2041 }
2042 });
2043
2044 leaf_handle_tx
2045 .send(leaf)
2046 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
2047 pending::<()>().await;
2048 }
2049 });
2050
2051 leaf_started_rx.await.unwrap();
2052 let leaf_handle = leaf_handle_rx.await.unwrap();
2053
2054 parent.abort();
2055 assert!(parent.await.is_err());
2056 assert!(leaf_handle.await.is_err());
2057 });
2058 }
2059
2060 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2061 where
2062 R::Context: Spawner,
2063 {
2064 runner.start(|context| async move {
2065 let context = if dedicated {
2066 context.dedicated()
2067 } else {
2068 context.shared(true)
2069 };
2070
2071 let handle = context.spawn(|_| async move { 42 });
2072 let result = handle.await;
2073 assert!(matches!(result, Ok(42)));
2074 });
2075 }
2076
2077 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2078 where
2079 R::Context: Spawner + Clock,
2080 {
2081 runner.start(|context| async move {
2082 let context = if dedicated {
2083 context.dedicated()
2084 } else {
2085 context.shared(true)
2086 };
2087
2088 context.clone().spawn(|_| async move {
2089 panic!("blocking task panicked");
2090 });
2091
2092 loop {
2094 context.sleep(Duration::from_millis(100)).await;
2095 }
2096 });
2097 }
2098
2099 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2100 where
2101 R::Context: Spawner + Clock,
2102 {
2103 let result: Result<(), Error> = runner.start(|context| async move {
2104 let context = if dedicated {
2105 context.dedicated()
2106 } else {
2107 context.shared(true)
2108 };
2109
2110 let handle = context.clone().spawn(|_| async move {
2111 panic!("blocking task panicked");
2112 });
2113 handle.await
2114 });
2115 assert!(matches!(result, Err(Error::Exited)));
2116 }
2117
2118 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2119 runner.start(|_| async move {
2120 let dropper = Arc::new(());
2122 let executor = deterministic::Runner::default();
2123 executor.start({
2124 let dropper = dropper.clone();
2125 move |context| async move {
2126 let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2128 let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2129 let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2130
2131 context.with_label("task1").spawn({
2133 let setup_tx = setup_tx.clone();
2134 let dropper = dropper.clone();
2135 move |_| async move {
2136 tx2.send(()).unwrap();
2138 rx1.recv().await.unwrap();
2139 setup_tx.send(()).unwrap();
2140
2141 while rx1.recv().await.is_some() {}
2143 drop(tx2);
2144 drop(dropper);
2145 }
2146 });
2147
2148 context.with_label("task2").spawn(move |_| async move {
2150 tx1.send(()).unwrap();
2152 rx2.recv().await.unwrap();
2153 setup_tx.send(()).unwrap();
2154
2155 while rx2.recv().await.is_some() {}
2157 drop(tx1);
2158 drop(dropper);
2159 });
2160
2161 setup_rx.recv().await.unwrap();
2163 setup_rx.recv().await.unwrap();
2164 }
2165 });
2166
2167 Arc::try_unwrap(dropper).expect("references remaining");
2169 });
2170 }
2171
2172 fn test_late_waker<R: Runner>(runner: R)
2173 where
2174 R::Context: Metrics + Spawner,
2175 {
2176 struct CaptureWaker {
2179 tx: Option<oneshot::Sender<Waker>>,
2180 sent: bool,
2181 }
2182 impl Future for CaptureWaker {
2183 type Output = ();
2184 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2185 if !self.sent {
2186 if let Some(tx) = self.tx.take() {
2187 let _ = tx.send(cx.waker().clone());
2189 }
2190 self.sent = true;
2191 }
2192 Poll::Pending
2193 }
2194 }
2195
2196 struct WakeOnDrop(Option<Waker>);
2198 impl Drop for WakeOnDrop {
2199 fn drop(&mut self) {
2200 if let Some(w) = self.0.take() {
2201 w.wake_by_ref();
2202 }
2203 }
2204 }
2205
2206 let holder = runner.start(|context| async move {
2208 let (tx, rx) = oneshot::channel::<Waker>();
2210
2211 context
2213 .with_label("capture_waker")
2214 .spawn(move |_| async move {
2215 CaptureWaker {
2216 tx: Some(tx),
2217 sent: false,
2218 }
2219 .await;
2220 });
2221
2222 utils::reschedule().await;
2224
2225 let waker = rx.await.expect("waker not received");
2227
2228 WakeOnDrop(Some(waker))
2230 });
2231
2232 drop(holder);
2235 }
2236
2237 fn test_metrics<R: Runner>(runner: R)
2238 where
2239 R::Context: Metrics,
2240 {
2241 runner.start(|context| async move {
2242 assert_eq!(context.label(), "");
2244
2245 let counter = Counter::<u64>::default();
2247 context.register("test", "test", counter.clone());
2248
2249 counter.inc();
2251
2252 let buffer = context.encode();
2254 assert!(buffer.contains("test_total 1"));
2255
2256 let context = context.with_label("nested");
2258 let nested_counter = Counter::<u64>::default();
2259 context.register("test", "test", nested_counter.clone());
2260
2261 nested_counter.inc();
2263
2264 let buffer = context.encode();
2266 assert!(buffer.contains("nested_test_total 1"));
2267 assert!(buffer.contains("test_total 1"));
2268 });
2269 }
2270
2271 fn test_metrics_with_attribute<R: Runner>(runner: R)
2272 where
2273 R::Context: Metrics,
2274 {
2275 runner.start(|context| async move {
2276 let ctx_epoch5 = context
2278 .with_label("consensus")
2279 .with_attribute("epoch", "e5");
2280
2281 let counter = Counter::<u64>::default();
2283 ctx_epoch5.register("votes", "vote count", counter.clone());
2284 counter.inc();
2285
2286 let buffer = context.encode();
2288 assert!(
2289 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2290 "Expected metric with epoch attribute, got: {}",
2291 buffer
2292 );
2293
2294 let ctx_epoch6 = context
2296 .with_label("consensus")
2297 .with_attribute("epoch", "e6");
2298 let counter2 = Counter::<u64>::default();
2299 ctx_epoch6.register("votes", "vote count", counter2.clone());
2300 counter2.inc();
2301 counter2.inc();
2302
2303 let buffer = context.encode();
2305 assert!(
2306 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2307 "Expected metric with epoch=e5, got: {}",
2308 buffer
2309 );
2310 assert!(
2311 buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2312 "Expected metric with epoch=e6, got: {}",
2313 buffer
2314 );
2315
2316 assert_eq!(
2318 buffer.matches("# HELP consensus_votes").count(),
2319 1,
2320 "HELP should appear exactly once, got: {}",
2321 buffer
2322 );
2323 assert_eq!(
2324 buffer.matches("# TYPE consensus_votes").count(),
2325 1,
2326 "TYPE should appear exactly once, got: {}",
2327 buffer
2328 );
2329
2330 let ctx_multi = context
2332 .with_label("engine")
2333 .with_attribute("region", "us")
2334 .with_attribute("instance", "i1");
2335 let counter3 = Counter::<u64>::default();
2336 ctx_multi.register("requests", "request count", counter3.clone());
2337 counter3.inc();
2338
2339 let buffer = context.encode();
2340 assert!(
2341 buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2342 "Expected metric with sorted attributes, got: {}",
2343 buffer
2344 );
2345 });
2346 }
2347
2348 #[test]
2349 fn test_deterministic_metrics_with_attribute() {
2350 let executor = deterministic::Runner::default();
2351 test_metrics_with_attribute(executor);
2352 }
2353
2354 #[test]
2355 fn test_tokio_metrics_with_attribute() {
2356 let runner = tokio::Runner::default();
2357 test_metrics_with_attribute(runner);
2358 }
2359
2360 fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2361 where
2362 R::Context: Metrics,
2363 {
2364 runner.start(|context| async move {
2365 let ctx = context
2367 .with_label("orchestrator")
2368 .with_attribute("epoch", "e5")
2369 .with_label("engine");
2370
2371 let counter = Counter::<u64>::default();
2373 ctx.register("votes", "vote count", counter.clone());
2374 counter.inc();
2375
2376 let buffer = context.encode();
2378 assert!(
2379 buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2380 "Expected metric with preserved epoch attribute, got: {}",
2381 buffer
2382 );
2383
2384 let ctx2 = context
2386 .with_label("outer")
2387 .with_attribute("region", "us")
2388 .with_label("middle")
2389 .with_attribute("az", "east")
2390 .with_label("inner");
2391
2392 let counter2 = Counter::<u64>::default();
2393 ctx2.register("requests", "request count", counter2.clone());
2394 counter2.inc();
2395 counter2.inc();
2396
2397 let buffer = context.encode();
2398 assert!(
2399 buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2400 "Expected metric with all attributes preserved and sorted, got: {}",
2401 buffer
2402 );
2403 });
2404 }
2405
2406 #[test]
2407 fn test_deterministic_metrics_attribute_with_nested_label() {
2408 let executor = deterministic::Runner::default();
2409 test_metrics_attribute_with_nested_label(executor);
2410 }
2411
2412 #[test]
2413 fn test_tokio_metrics_attribute_with_nested_label() {
2414 let runner = tokio::Runner::default();
2415 test_metrics_attribute_with_nested_label(runner);
2416 }
2417
2418 fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2419 where
2420 R::Context: Metrics,
2421 {
2422 runner.start(|context| async move {
2423 let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2425 let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2426
2427 let c1 = Counter::<u64>::default();
2429 ctx_a.register("requests", "help", c1);
2430
2431 let c2 = Counter::<u64>::default();
2433 ctx_b.register("requests", "help", c2);
2434
2435 let c3 = Counter::<u64>::default();
2437 ctx_a.register("errors", "help", c3);
2438
2439 let output = context.encode();
2440
2441 assert!(
2443 output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2444 "ctx_a requests should have epoch=1: {output}"
2445 );
2446 assert!(
2447 output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2448 "ctx_a errors should have epoch=1: {output}"
2449 );
2450 assert!(
2451 !output.contains("component_a_requests_total{epoch=\"2\"}"),
2452 "ctx_a requests should not have epoch=2: {output}"
2453 );
2454
2455 assert!(
2457 output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2458 "ctx_b should have epoch=2: {output}"
2459 );
2460 assert!(
2461 !output.contains("component_b_requests_total{epoch=\"1\"}"),
2462 "ctx_b should not have epoch=1: {output}"
2463 );
2464 });
2465 }
2466
2467 #[test]
2468 fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2469 let executor = deterministic::Runner::default();
2470 test_metrics_attributes_isolated_between_contexts(executor);
2471 }
2472
2473 #[test]
2474 fn test_tokio_metrics_attributes_isolated_between_contexts() {
2475 let runner = tokio::Runner::default();
2476 test_metrics_attributes_isolated_between_contexts(runner);
2477 }
2478
2479 fn test_metrics_spawn_attribute_cardinality<R: Runner>(runner: R)
2486 where
2487 R::Context: Spawner + Metrics + Clock,
2488 {
2489 runner.start(|context| async move {
2490 const ROUNDS: u64 = 128;
2491
2492 let mut handles = Vec::with_capacity(ROUNDS as usize);
2493 for round in 0..ROUNDS {
2494 let handle = context
2495 .with_label("deferred_verify")
2496 .with_attribute("round", round)
2497 .spawn(move |_| async move { round });
2498 handles.push(handle);
2499 }
2500 for (expected, handle) in handles.into_iter().enumerate() {
2501 assert_eq!(handle.await.expect("task failed"), expected as u64);
2502 }
2503
2504 while count_running_tasks(&context, "deferred_verify") > 0 {
2508 context.sleep(Duration::from_millis(10)).await;
2509 }
2510 let buffer = context.encode();
2511
2512 let spawned_lines = buffer
2516 .lines()
2517 .filter(|line| {
2518 line.starts_with("runtime_tasks_spawned_total{")
2519 && line.contains("name=\"deferred_verify\"")
2520 })
2521 .count();
2522 let running_lines = buffer
2523 .lines()
2524 .filter(|line| {
2525 line.starts_with("runtime_tasks_running{")
2526 && line.contains("name=\"deferred_verify\"")
2527 })
2528 .count();
2529 assert_eq!(
2530 spawned_lines, 1,
2531 "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2532 );
2533 assert_eq!(
2534 running_lines, 1,
2535 "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2536 );
2537
2538 let spawned_value = format!(
2540 "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2541 );
2542 assert!(
2543 buffer.contains(&spawned_value),
2544 "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2545 );
2546 let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2547 assert!(
2548 buffer.contains(running_value),
2549 "expected running gauge to return to 0, got: {buffer}",
2550 );
2551
2552 assert!(
2555 !buffer
2556 .lines()
2557 .any(|line| line.starts_with("runtime_tasks_")
2558 && line.contains("round=")),
2559 "task metrics must not carry `round` attribute: {buffer}",
2560 );
2561 });
2562 }
2563
2564 #[test]
2565 fn test_deterministic_metrics_spawn_attribute_cardinality() {
2566 let executor = deterministic::Runner::default();
2567 test_metrics_spawn_attribute_cardinality(executor);
2568 }
2569
2570 #[test]
2571 fn test_tokio_metrics_spawn_attribute_cardinality() {
2572 let runner = tokio::Runner::default();
2573 test_metrics_spawn_attribute_cardinality(runner);
2574 }
2575
2576 fn test_metrics_spawn_scope_cardinality<R: Runner>(runner: R)
2585 where
2586 R::Context: Spawner + Metrics + Clock,
2587 {
2588 runner.start(|context| async move {
2589 const ROUNDS: u64 = 128;
2590
2591 let mut handles = Vec::with_capacity(ROUNDS as usize);
2592 for round in 0..ROUNDS {
2593 let scoped = context
2594 .with_label("deferred_verify")
2595 .with_attribute("round", round)
2596 .with_scope();
2597 let handle = scoped.spawn(move |_| async move { round });
2598 handles.push(handle);
2599 }
2601 for (expected, handle) in handles.into_iter().enumerate() {
2602 assert_eq!(handle.await.expect("task failed"), expected as u64);
2603 }
2604
2605 while count_running_tasks(&context, "deferred_verify") > 0 {
2609 context.sleep(Duration::from_millis(10)).await;
2610 }
2611 let buffer = context.encode();
2612
2613 let spawned_lines = buffer
2614 .lines()
2615 .filter(|line| {
2616 line.starts_with("runtime_tasks_spawned_total{")
2617 && line.contains("name=\"deferred_verify\"")
2618 })
2619 .count();
2620 let running_lines = buffer
2621 .lines()
2622 .filter(|line| {
2623 line.starts_with("runtime_tasks_running{")
2624 && line.contains("name=\"deferred_verify\"")
2625 })
2626 .count();
2627 assert_eq!(
2628 spawned_lines, 1,
2629 "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2630 );
2631 assert_eq!(
2632 running_lines, 1,
2633 "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2634 );
2635
2636 let spawned_value = format!(
2637 "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2638 );
2639 assert!(
2640 buffer.contains(&spawned_value),
2641 "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2642 );
2643 let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2644 assert!(
2645 buffer.contains(running_value),
2646 "expected running gauge to return to 0, got: {buffer}",
2647 );
2648 });
2649 }
2650
2651 #[test]
2652 fn test_deterministic_metrics_spawn_scope_cardinality() {
2653 let executor = deterministic::Runner::default();
2654 test_metrics_spawn_scope_cardinality(executor);
2655 }
2656
2657 #[test]
2658 fn test_tokio_metrics_spawn_scope_cardinality() {
2659 let runner = tokio::Runner::default();
2660 test_metrics_spawn_scope_cardinality(runner);
2661 }
2662
2663 fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2664 where
2665 R::Context: Metrics,
2666 {
2667 runner.start(|context| async move {
2668 let ctx_ab = context
2670 .with_label("service")
2671 .with_attribute("region", "us")
2672 .with_attribute("env", "prod");
2673
2674 let ctx_ba = context
2675 .with_label("service")
2676 .with_attribute("env", "prod")
2677 .with_attribute("region", "us");
2678
2679 let c1 = Counter::<u64>::default();
2681 ctx_ab.register("requests", "help", c1.clone());
2682 c1.inc();
2683
2684 let c2 = Counter::<u64>::default();
2686 ctx_ba.register("errors", "help", c2.clone());
2687 c2.inc();
2688 c2.inc();
2689
2690 let output = context.encode();
2691
2692 assert!(
2694 output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2695 "requests should have sorted labels: {output}"
2696 );
2697 assert!(
2698 output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2699 "errors should have sorted labels: {output}"
2700 );
2701
2702 assert!(
2704 !output.contains("region=\"us\",env=\"prod\""),
2705 "should not have unsorted label order: {output}"
2706 );
2707 });
2708 }
2709
2710 #[test]
2711 fn test_deterministic_metrics_attributes_sorted_deterministically() {
2712 let executor = deterministic::Runner::default();
2713 test_metrics_attributes_sorted_deterministically(executor);
2714 }
2715
2716 #[test]
2717 fn test_tokio_metrics_attributes_sorted_deterministically() {
2718 let runner = tokio::Runner::default();
2719 test_metrics_attributes_sorted_deterministically(runner);
2720 }
2721
2722 fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2723 where
2724 R::Context: Metrics,
2725 {
2726 runner.start(|context| async move {
2727 let svc_a = context.with_label("service_a");
2729
2730 let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2732
2733 let svc_b_worker = context.with_label("service_b").with_label("worker");
2735
2736 let svc_b_worker_shard = context
2738 .with_label("service_b")
2739 .with_label("worker")
2740 .with_attribute("shard", 99);
2741
2742 let svc_b_manager = context.with_label("service_b").with_label("manager");
2744
2745 let svc_c = context.with_label("service_c");
2747
2748 let c1 = Counter::<u64>::default();
2750 svc_a.register("requests", "help", c1);
2751
2752 let c2 = Counter::<u64>::default();
2753 svc_a_v2.register("requests", "help", c2);
2754
2755 let c3 = Counter::<u64>::default();
2756 svc_b_worker.register("tasks", "help", c3);
2757
2758 let c4 = Counter::<u64>::default();
2759 svc_b_worker_shard.register("tasks", "help", c4);
2760
2761 let c5 = Counter::<u64>::default();
2762 svc_b_manager.register("decisions", "help", c5);
2763
2764 let c6 = Counter::<u64>::default();
2765 svc_c.register("requests", "help", c6);
2766
2767 let output = context.encode();
2768
2769 assert!(
2771 output.contains("service_a_requests_total 0"),
2772 "svc_a plain should exist: {output}"
2773 );
2774 assert!(
2775 output.contains("service_a_requests_total{version=\"2\"} 0"),
2776 "svc_a_v2 should have version=2: {output}"
2777 );
2778
2779 assert!(
2781 output.contains("service_b_worker_tasks_total 0"),
2782 "svc_b_worker plain should exist: {output}"
2783 );
2784 assert!(
2785 output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2786 "svc_b_worker_shard should have shard=99: {output}"
2787 );
2788
2789 assert!(
2791 output.contains("service_b_manager_decisions_total 0"),
2792 "svc_b_manager should have no attributes: {output}"
2793 );
2794 assert!(
2795 !output.contains("service_b_manager_decisions_total{"),
2796 "svc_b_manager should have no attributes at all: {output}"
2797 );
2798
2799 assert!(
2801 output.contains("service_c_requests_total 0"),
2802 "svc_c should have no attributes: {output}"
2803 );
2804 assert!(
2805 !output.contains("service_c_requests_total{"),
2806 "svc_c should have no attributes at all: {output}"
2807 );
2808
2809 assert!(
2811 !output.contains("service_b_manager_decisions_total{shard="),
2812 "svc_b_manager should not have shard: {output}"
2813 );
2814 assert!(
2815 !output.contains("service_a_requests_total{shard="),
2816 "svc_a should not have shard: {output}"
2817 );
2818 assert!(
2819 !output.contains("service_c_requests_total{version="),
2820 "svc_c should not have version: {output}"
2821 );
2822 });
2823 }
2824
2825 #[test]
2826 fn test_deterministic_metrics_nested_labels_with_attributes() {
2827 let executor = deterministic::Runner::default();
2828 test_metrics_nested_labels_with_attributes(executor);
2829 }
2830
2831 #[test]
2832 fn test_tokio_metrics_nested_labels_with_attributes() {
2833 let runner = tokio::Runner::default();
2834 test_metrics_nested_labels_with_attributes(runner);
2835 }
2836
2837 fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2838 where
2839 R::Context: Metrics,
2840 {
2841 runner.start(|context| async move {
2842 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2843 struct RequestLabels {
2844 method: String,
2845 status: u16,
2846 }
2847
2848 let ctx = context
2850 .with_label("api")
2851 .with_attribute("region", "us_east")
2852 .with_attribute("env", "prod");
2853
2854 let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2856 ctx.register("requests", "HTTP requests", requests.clone());
2857
2858 requests
2860 .get_or_create(&RequestLabels {
2861 method: "GET".to_string(),
2862 status: 200,
2863 })
2864 .inc();
2865 requests
2866 .get_or_create(&RequestLabels {
2867 method: "POST".to_string(),
2868 status: 201,
2869 })
2870 .inc();
2871 requests
2872 .get_or_create(&RequestLabels {
2873 method: "GET".to_string(),
2874 status: 404,
2875 })
2876 .inc();
2877
2878 let output = context.encode();
2879
2880 assert!(
2884 output.contains(
2885 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2886 ),
2887 "GET 200 should have merged labels: {output}"
2888 );
2889 assert!(
2890 output.contains(
2891 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2892 ),
2893 "POST 201 should have merged labels: {output}"
2894 );
2895 assert!(
2896 output.contains(
2897 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2898 ),
2899 "GET 404 should have merged labels: {output}"
2900 );
2901
2902 let ctx_plain = context.with_label("api_plain");
2904 let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2905 ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2906
2907 plain_requests
2908 .get_or_create(&RequestLabels {
2909 method: "DELETE".to_string(),
2910 status: 204,
2911 })
2912 .inc();
2913
2914 let output = context.encode();
2915
2916 assert!(
2918 output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2919 "plain DELETE should have only family labels: {output}"
2920 );
2921 assert!(
2922 !output.contains("api_plain_requests_total{env="),
2923 "plain should not have env attribute: {output}"
2924 );
2925 assert!(
2926 !output.contains("api_plain_requests_total{region="),
2927 "plain should not have region attribute: {output}"
2928 );
2929 });
2930 }
2931
2932 #[test]
2933 fn test_deterministic_metrics_family_with_attributes() {
2934 let executor = deterministic::Runner::default();
2935 test_metrics_family_with_attributes(executor);
2936 }
2937
2938 #[test]
2939 fn test_tokio_metrics_family_with_attributes() {
2940 let runner = tokio::Runner::default();
2941 test_metrics_family_with_attributes(runner);
2942 }
2943
2944 fn test_with_scope_register_and_encode<R: Runner>(runner: R)
2945 where
2946 R::Context: Metrics,
2947 {
2948 runner.start(|context| async move {
2949 let scoped = context.with_label("engine").with_scope();
2950 let counter = Counter::<u64>::default();
2951 scoped.register("votes", "vote count", counter.clone());
2952 counter.inc();
2953
2954 let buffer = context.encode();
2955 assert!(
2956 buffer.contains("engine_votes_total 1"),
2957 "scoped metric should appear in encode: {buffer}"
2958 );
2959 });
2960 }
2961
2962 #[test]
2963 fn test_deterministic_with_scope_register_and_encode() {
2964 let executor = deterministic::Runner::default();
2965 test_with_scope_register_and_encode(executor);
2966 }
2967
2968 #[test]
2969 fn test_tokio_with_scope_register_and_encode() {
2970 let runner = tokio::Runner::default();
2971 test_with_scope_register_and_encode(runner);
2972 }
2973
2974 fn test_with_scope_drop_removes_metrics<R: Runner>(runner: R)
2975 where
2976 R::Context: Metrics,
2977 {
2978 runner.start(|context| async move {
2979 let permanent = Counter::<u64>::default();
2981 context.with_label("permanent").register(
2982 "counter",
2983 "permanent counter",
2984 permanent.clone(),
2985 );
2986 permanent.inc();
2987
2988 let scoped = context.with_label("engine").with_scope();
2990 let counter = Counter::<u64>::default();
2991 scoped.register("votes", "vote count", counter.clone());
2992 counter.inc();
2993
2994 let buffer = context.encode();
2996 assert!(buffer.contains("permanent_counter_total 1"));
2997 assert!(buffer.contains("engine_votes_total 1"));
2998
2999 drop(scoped);
3001
3002 let buffer = context.encode();
3004 assert!(
3005 buffer.contains("permanent_counter_total 1"),
3006 "permanent metric should survive scope drop: {buffer}"
3007 );
3008 assert!(
3009 !buffer.contains("engine_votes"),
3010 "scoped metric should be removed after scope drop: {buffer}"
3011 );
3012 });
3013 }
3014
3015 #[test]
3016 fn test_deterministic_with_scope_drop_removes_metrics() {
3017 let executor = deterministic::Runner::default();
3018 test_with_scope_drop_removes_metrics(executor);
3019 }
3020
3021 #[test]
3022 fn test_tokio_with_scope_drop_removes_metrics() {
3023 let runner = tokio::Runner::default();
3024 test_with_scope_drop_removes_metrics(runner);
3025 }
3026
3027 fn test_with_scope_attributes<R: Runner>(runner: R)
3028 where
3029 R::Context: Metrics,
3030 {
3031 runner.start(|context| async move {
3032 let epoch1 = context
3034 .with_label("engine")
3035 .with_attribute("epoch", 1)
3036 .with_scope();
3037 let c1 = Counter::<u64>::default();
3038 epoch1.register("votes", "vote count", c1.clone());
3039 c1.inc();
3040
3041 let epoch2 = context
3042 .with_label("engine")
3043 .with_attribute("epoch", 2)
3044 .with_scope();
3045 let c2 = Counter::<u64>::default();
3046 epoch2.register("votes", "vote count", c2.clone());
3047 c2.inc();
3048 c2.inc();
3049
3050 let buffer = context.encode();
3052 assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
3053 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
3054
3055 assert_eq!(
3057 buffer.matches("# HELP engine_votes").count(),
3058 1,
3059 "HELP should appear once: {buffer}"
3060 );
3061 assert_eq!(
3062 buffer.matches("# TYPE engine_votes").count(),
3063 1,
3064 "TYPE should appear once: {buffer}"
3065 );
3066
3067 drop(epoch1);
3069 let buffer = context.encode();
3070 assert!(
3071 !buffer.contains("epoch=\"1\""),
3072 "epoch 1 should be gone: {buffer}"
3073 );
3074 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
3075
3076 drop(epoch2);
3078 let buffer = context.encode();
3079 assert!(
3080 !buffer.contains("engine_votes"),
3081 "all epoch metrics should be gone: {buffer}"
3082 );
3083 });
3084 }
3085
3086 #[test]
3087 fn test_deterministic_with_scope_attributes() {
3088 let executor = deterministic::Runner::default();
3089 test_with_scope_attributes(executor);
3090 }
3091
3092 #[test]
3093 fn test_tokio_with_scope_attributes() {
3094 let runner = tokio::Runner::default();
3095 test_with_scope_attributes(runner);
3096 }
3097
3098 fn test_with_scope_inherits_on_with_label<R: Runner>(runner: R)
3099 where
3100 R::Context: Metrics,
3101 {
3102 runner.start(|context| async move {
3103 let scoped = context.with_label("engine").with_scope();
3104
3105 let child = scoped.with_label("batcher");
3107 let counter = Counter::<u64>::default();
3108 child.register("msgs", "message count", counter.clone());
3109 counter.inc();
3110
3111 let buffer = context.encode();
3112 assert!(buffer.contains("engine_batcher_msgs_total 1"));
3113
3114 drop(child);
3116 drop(scoped);
3117 let buffer = context.encode();
3118 assert!(
3119 !buffer.contains("engine_batcher_msgs"),
3120 "child metric should be removed with scope: {buffer}"
3121 );
3122 });
3123 }
3124
3125 #[test]
3126 fn test_deterministic_with_scope_inherits_on_with_label() {
3127 let executor = deterministic::Runner::default();
3128 test_with_scope_inherits_on_with_label(executor);
3129 }
3130
3131 #[test]
3132 fn test_tokio_with_scope_inherits_on_with_label() {
3133 let runner = tokio::Runner::default();
3134 test_with_scope_inherits_on_with_label(runner);
3135 }
3136
3137 fn test_multiple_scopes<R: Runner>(runner: R)
3138 where
3139 R::Context: Metrics,
3140 {
3141 runner.start(|context| async move {
3142 let ctx_a = context.with_label("a").with_scope();
3143 let ctx_b = context.with_label("b").with_scope();
3144
3145 let ca = Counter::<u64>::default();
3146 ctx_a.register("counter", "a counter", ca.clone());
3147 ca.inc();
3148
3149 let cb = Counter::<u64>::default();
3150 ctx_b.register("counter", "b counter", cb.clone());
3151 cb.inc();
3152 cb.inc();
3153
3154 let buffer = context.encode();
3155 assert!(buffer.contains("a_counter_total 1"));
3156 assert!(buffer.contains("b_counter_total 2"));
3157
3158 drop(ctx_a);
3160 let buffer = context.encode();
3161 assert!(!buffer.contains("a_counter"));
3162 assert!(buffer.contains("b_counter_total 2"));
3163
3164 drop(ctx_b);
3166 let buffer = context.encode();
3167 assert!(!buffer.contains("b_counter"));
3168 });
3169 }
3170
3171 #[test]
3172 fn test_deterministic_multiple_scopes() {
3173 let executor = deterministic::Runner::default();
3174 test_multiple_scopes(executor);
3175 }
3176
3177 #[test]
3178 fn test_tokio_multiple_scopes() {
3179 let runner = tokio::Runner::default();
3180 test_multiple_scopes(runner);
3181 }
3182
3183 fn test_encode_single_eof<R: Runner>(runner: R)
3184 where
3185 R::Context: Metrics,
3186 {
3187 runner.start(|context| async move {
3188 let root_counter = Counter::<u64>::default();
3189 context.register("root", "root metric", root_counter.clone());
3190 root_counter.inc();
3191
3192 let scoped = context.with_label("engine").with_scope();
3193 let scoped_counter = Counter::<u64>::default();
3194 scoped.register("ops", "scoped metric", scoped_counter.clone());
3195 scoped_counter.inc();
3196
3197 let buffer = context.encode();
3198 assert!(
3199 buffer.contains("root_total 1"),
3200 "root metric missing: {buffer}"
3201 );
3202 assert!(
3203 buffer.contains("engine_ops_total 1"),
3204 "scoped metric missing: {buffer}"
3205 );
3206 assert_eq!(
3207 buffer.matches("# EOF").count(),
3208 1,
3209 "expected exactly one EOF marker: {buffer}"
3210 );
3211 assert!(
3212 buffer.ends_with("# EOF\n"),
3213 "EOF must be the last line: {buffer}"
3214 );
3215 });
3216 }
3217
3218 #[test]
3219 fn test_deterministic_encode_single_eof() {
3220 let executor = deterministic::Runner::default();
3221 test_encode_single_eof(executor);
3222 }
3223
3224 #[test]
3225 fn test_tokio_encode_single_eof() {
3226 let runner = tokio::Runner::default();
3227 test_encode_single_eof(runner);
3228 }
3229
3230 fn test_with_scope_nested_inherits<R: Runner>(runner: R)
3231 where
3232 R::Context: Metrics,
3233 {
3234 runner.start(|context| async move {
3235 let scoped = context.with_label("engine").with_scope();
3236
3237 let nested = scoped.with_scope();
3239 let counter = Counter::<u64>::default();
3240 nested.register("votes", "vote count", counter.clone());
3241 counter.inc();
3242
3243 let buffer = context.encode();
3244 assert!(
3245 buffer.contains("engine_votes_total 1"),
3246 "nested scope should inherit parent scope: {buffer}"
3247 );
3248
3249 drop(nested);
3252 let buffer = context.encode();
3253 assert!(
3254 buffer.contains("engine_votes_total 1"),
3255 "metrics should survive as long as any scope clone exists: {buffer}"
3256 );
3257
3258 drop(scoped);
3260 let buffer = context.encode();
3261 assert!(
3262 !buffer.contains("engine_votes"),
3263 "metrics should be removed when all scope clones are dropped: {buffer}"
3264 );
3265 });
3266 }
3267
3268 #[test]
3269 fn test_deterministic_with_scope_nested_inherits() {
3270 let executor = deterministic::Runner::default();
3271 test_with_scope_nested_inherits(executor);
3272 }
3273
3274 #[test]
3275 fn test_tokio_with_scope_nested_inherits() {
3276 let runner = tokio::Runner::default();
3277 test_with_scope_nested_inherits(runner);
3278 }
3279
3280 #[test]
3281 #[should_panic(expected = "duplicate metric:")]
3282 fn test_deterministic_reregister_after_scope_drop() {
3283 let executor = deterministic::Runner::default();
3284 executor.start(|context| async move {
3285 let scoped = context
3286 .with_label("engine")
3287 .with_attribute("epoch", 1)
3288 .with_scope();
3289 let c1 = Counter::<u64>::default();
3290 scoped.register("votes", "vote count", c1);
3291 drop(scoped);
3292
3293 let scoped2 = context
3295 .with_label("engine")
3296 .with_attribute("epoch", 1)
3297 .with_scope();
3298 let c2 = Counter::<u64>::default();
3299 scoped2.register("votes", "vote count", c2);
3300 });
3301 }
3302
3303 fn test_with_scope_family_with_attributes<R: Runner>(runner: R)
3304 where
3305 R::Context: Metrics,
3306 {
3307 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3308 struct Peer {
3309 name: String,
3310 }
3311 impl EncodeLabelSet for Peer {
3312 fn encode(
3313 &self,
3314 encoder: &mut prometheus_client::encoding::LabelSetEncoder<'_>,
3315 ) -> Result<(), std::fmt::Error> {
3316 let mut label = encoder.encode_label();
3317 let mut key = label.encode_label_key()?;
3318 EncodeLabelKey::encode(&"peer", &mut key)?;
3319 let mut value = key.encode_label_value()?;
3320 EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3321 value.finish()
3322 }
3323 }
3324
3325 runner.start(|context| async move {
3326 let scoped = context
3327 .with_label("batcher")
3328 .with_attribute("epoch", 1)
3329 .with_scope();
3330
3331 let family: Family<Peer, Counter> = Family::default();
3332 scoped.register("votes", "votes per peer", family.clone());
3333 family
3334 .get_or_create(&Peer {
3335 name: "alice".into(),
3336 })
3337 .inc();
3338 family.get_or_create(&Peer { name: "bob".into() }).inc();
3339
3340 let buffer = context.encode();
3341 assert!(
3342 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3343 "family with attributes should combine labels: {buffer}"
3344 );
3345 assert!(
3346 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3347 "family with attributes should combine labels: {buffer}"
3348 );
3349
3350 drop(scoped);
3351 let buffer = context.encode();
3352 assert!(
3353 !buffer.contains("batcher_votes"),
3354 "family metrics should be removed: {buffer}"
3355 );
3356 });
3357 }
3358
3359 #[test]
3360 fn test_deterministic_with_scope_family_with_attributes() {
3361 let executor = deterministic::Runner::default();
3362 test_with_scope_family_with_attributes(executor);
3363 }
3364
3365 #[test]
3366 fn test_tokio_with_scope_family_with_attributes() {
3367 let runner = tokio::Runner::default();
3368 test_with_scope_family_with_attributes(runner);
3369 }
3370
3371 #[test]
3372 fn test_deterministic_future() {
3373 let runner = deterministic::Runner::default();
3374 test_error_future(runner);
3375 }
3376
3377 #[test]
3378 fn test_deterministic_clock_sleep() {
3379 let executor = deterministic::Runner::default();
3380 test_clock_sleep(executor);
3381 }
3382
3383 #[test]
3384 fn test_deterministic_clock_sleep_until() {
3385 let executor = deterministic::Runner::default();
3386 test_clock_sleep_until(executor);
3387 }
3388
3389 #[test]
3390 fn test_deterministic_clock_sleep_until_far_future() {
3391 let executor = deterministic::Runner::default();
3392 test_clock_sleep_until_far_future(executor);
3393 }
3394
3395 #[test]
3396 fn test_deterministic_clock_timeout() {
3397 let executor = deterministic::Runner::default();
3398 test_clock_timeout(executor);
3399 }
3400
3401 #[test]
3402 fn test_deterministic_root_finishes() {
3403 let executor = deterministic::Runner::default();
3404 test_root_finishes(executor);
3405 }
3406
3407 #[test]
3408 fn test_deterministic_spawn_after_abort() {
3409 let executor = deterministic::Runner::default();
3410 test_spawn_after_abort(executor);
3411 }
3412
3413 #[test]
3414 fn test_deterministic_spawn_abort() {
3415 let executor = deterministic::Runner::default();
3416 test_spawn_abort(executor, false, false);
3417 }
3418
3419 #[test]
3420 #[should_panic(expected = "blah")]
3421 fn test_deterministic_panic_aborts_root() {
3422 let runner = deterministic::Runner::default();
3423 test_panic_aborts_root(runner);
3424 }
3425
3426 #[test]
3427 #[should_panic(expected = "blah")]
3428 fn test_deterministic_panic_aborts_root_caught() {
3429 let cfg = deterministic::Config::default().with_catch_panics(true);
3430 let runner = deterministic::Runner::new(cfg);
3431 test_panic_aborts_root(runner);
3432 }
3433
3434 #[test]
3435 #[should_panic(expected = "blah")]
3436 fn test_deterministic_panic_aborts_spawn() {
3437 let executor = deterministic::Runner::default();
3438 test_panic_aborts_spawn(executor);
3439 }
3440
3441 #[test]
3442 fn test_deterministic_panic_aborts_spawn_caught() {
3443 let cfg = deterministic::Config::default().with_catch_panics(true);
3444 let executor = deterministic::Runner::new(cfg);
3445 test_panic_aborts_spawn_caught(executor);
3446 }
3447
3448 #[test]
3449 #[should_panic(expected = "boom")]
3450 fn test_deterministic_multiple_panics() {
3451 let executor = deterministic::Runner::default();
3452 test_multiple_panics(executor);
3453 }
3454
3455 #[test]
3456 fn test_deterministic_multiple_panics_caught() {
3457 let cfg = deterministic::Config::default().with_catch_panics(true);
3458 let executor = deterministic::Runner::new(cfg);
3459 test_multiple_panics_caught(executor);
3460 }
3461
3462 #[test]
3463 fn test_deterministic_select() {
3464 let executor = deterministic::Runner::default();
3465 test_select(executor);
3466 }
3467
3468 #[test]
3469 fn test_deterministic_select_loop() {
3470 let executor = deterministic::Runner::default();
3471 test_select_loop(executor);
3472 }
3473
3474 #[test]
3475 fn test_deterministic_storage_operations() {
3476 let executor = deterministic::Runner::default();
3477 test_storage_operations(executor);
3478 }
3479
3480 #[test]
3481 fn test_deterministic_blob_read_write() {
3482 let executor = deterministic::Runner::default();
3483 test_blob_read_write(executor);
3484 }
3485
3486 #[test]
3487 fn test_deterministic_blob_resize() {
3488 let executor = deterministic::Runner::default();
3489 test_blob_resize(executor);
3490 }
3491
3492 #[test]
3493 fn test_deterministic_many_partition_read_write() {
3494 let executor = deterministic::Runner::default();
3495 test_many_partition_read_write(executor);
3496 }
3497
3498 #[test]
3499 fn test_deterministic_blob_read_past_length() {
3500 let executor = deterministic::Runner::default();
3501 test_blob_read_past_length(executor);
3502 }
3503
3504 #[test]
3505 fn test_deterministic_blob_clone_and_concurrent_read() {
3506 let executor = deterministic::Runner::default();
3508 test_blob_clone_and_concurrent_read(executor);
3509 }
3510
3511 #[test]
3512 fn test_deterministic_shutdown() {
3513 let executor = deterministic::Runner::default();
3514 test_shutdown(executor);
3515 }
3516
3517 #[test]
3518 fn test_deterministic_shutdown_multiple_signals() {
3519 let executor = deterministic::Runner::default();
3520 test_shutdown_multiple_signals(executor);
3521 }
3522
3523 #[test]
3524 fn test_deterministic_shutdown_timeout() {
3525 let executor = deterministic::Runner::default();
3526 test_shutdown_timeout(executor);
3527 }
3528
3529 #[test]
3530 fn test_deterministic_shutdown_multiple_stop_calls() {
3531 let executor = deterministic::Runner::default();
3532 test_shutdown_multiple_stop_calls(executor);
3533 }
3534
3535 #[test]
3536 fn test_deterministic_unfulfilled_shutdown() {
3537 let executor = deterministic::Runner::default();
3538 test_unfulfilled_shutdown(executor);
3539 }
3540
3541 #[test]
3542 fn test_deterministic_spawn_dedicated() {
3543 let executor = deterministic::Runner::default();
3544 test_spawn_dedicated(executor);
3545 }
3546
3547 #[test]
3548 fn test_deterministic_spawn() {
3549 let runner = deterministic::Runner::default();
3550 test_spawn(runner);
3551 }
3552
3553 #[test]
3554 fn test_deterministic_spawn_abort_on_parent_abort() {
3555 let runner = deterministic::Runner::default();
3556 test_spawn_abort_on_parent_abort(runner);
3557 }
3558
3559 #[test]
3560 fn test_deterministic_spawn_abort_on_parent_completion() {
3561 let runner = deterministic::Runner::default();
3562 test_spawn_abort_on_parent_completion(runner);
3563 }
3564
3565 #[test]
3566 fn test_deterministic_spawn_cascading_abort() {
3567 let runner = deterministic::Runner::default();
3568 test_spawn_cascading_abort(runner);
3569 }
3570
3571 #[test]
3572 fn test_deterministic_child_survives_sibling_completion() {
3573 let runner = deterministic::Runner::default();
3574 test_child_survives_sibling_completion(runner);
3575 }
3576
3577 #[test]
3578 fn test_deterministic_spawn_clone_chain() {
3579 let runner = deterministic::Runner::default();
3580 test_spawn_clone_chain(runner);
3581 }
3582
3583 #[test]
3584 fn test_deterministic_spawn_sparse_clone_chain() {
3585 let runner = deterministic::Runner::default();
3586 test_spawn_sparse_clone_chain(runner);
3587 }
3588
3589 #[test]
3590 fn test_deterministic_spawn_blocking() {
3591 for dedicated in [false, true] {
3592 let executor = deterministic::Runner::default();
3593 test_spawn_blocking(executor, dedicated);
3594 }
3595 }
3596
3597 #[test]
3598 #[should_panic(expected = "blocking task panicked")]
3599 fn test_deterministic_spawn_blocking_panic() {
3600 for dedicated in [false, true] {
3601 let executor = deterministic::Runner::default();
3602 test_spawn_blocking_panic(executor, dedicated);
3603 }
3604 }
3605
3606 #[test]
3607 fn test_deterministic_spawn_blocking_panic_caught() {
3608 for dedicated in [false, true] {
3609 let cfg = deterministic::Config::default().with_catch_panics(true);
3610 let executor = deterministic::Runner::new(cfg);
3611 test_spawn_blocking_panic_caught(executor, dedicated);
3612 }
3613 }
3614
3615 #[test]
3616 fn test_deterministic_spawn_blocking_abort() {
3617 for (dedicated, blocking) in [(false, true), (true, false)] {
3618 let executor = deterministic::Runner::default();
3619 test_spawn_abort(executor, dedicated, blocking);
3620 }
3621 }
3622
3623 #[test]
3624 fn test_deterministic_circular_reference_prevents_cleanup() {
3625 let executor = deterministic::Runner::default();
3626 test_circular_reference_prevents_cleanup(executor);
3627 }
3628
3629 #[test]
3630 fn test_deterministic_late_waker() {
3631 let executor = deterministic::Runner::default();
3632 test_late_waker(executor);
3633 }
3634
3635 #[test]
3636 fn test_deterministic_metrics() {
3637 let executor = deterministic::Runner::default();
3638 test_metrics(executor);
3639 }
3640
3641 #[test_collect_traces]
3642 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3643 let executor = deterministic::Runner::new(deterministic::Config::default());
3644 executor.start(|context| async move {
3645 context
3646 .with_label("test")
3647 .with_span()
3648 .spawn(|context| async move {
3649 tracing::info!(field = "test field", "test log");
3650
3651 context
3652 .with_label("inner")
3653 .with_span()
3654 .spawn(|_| async move {
3655 tracing::info!("inner log");
3656 })
3657 .await
3658 .unwrap();
3659 })
3660 .await
3661 .unwrap();
3662 });
3663
3664 let info_traces = traces.get_by_level(Level::INFO);
3665 assert_eq!(info_traces.len(), 2);
3666
3667 info_traces
3669 .expect_event_at_index(0, |event| {
3670 event.metadata.expect_content_exact("test log")?;
3671 event.metadata.expect_field_count(1)?;
3672 event.metadata.expect_field_exact("field", "test field")?;
3673 event.expect_span_count(1)?;
3674 event.expect_span_at_index(0, |span| {
3675 span.expect_content_exact("task")?;
3676 span.expect_field_count(1)?;
3677 span.expect_field_exact("name", "test")
3678 })
3679 })
3680 .unwrap();
3681
3682 info_traces
3683 .expect_event_at_index(1, |event| {
3684 event.metadata.expect_content_exact("inner log")?;
3685 event.metadata.expect_field_count(0)?;
3686 event.expect_span_count(1)?;
3687 event.expect_span_at_index(0, |span| {
3688 span.expect_content_exact("task")?;
3689 span.expect_field_count(1)?;
3690 span.expect_field_exact("name", "test_inner")
3691 })
3692 })
3693 .unwrap();
3694 }
3695
3696 #[test]
3697 fn test_deterministic_resolver() {
3698 let executor = deterministic::Runner::default();
3699 executor.start(|context| async move {
3700 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3702 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3703 context.resolver_register("example.com", Some(vec![ip1, ip2]));
3704
3705 let addrs = context.resolve("example.com").await.unwrap();
3707 assert_eq!(addrs, vec![ip1, ip2]);
3708
3709 let result = context.resolve("unknown.com").await;
3711 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3712
3713 context.resolver_register("example.com", None);
3715 let result = context.resolve("example.com").await;
3716 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3717 });
3718 }
3719
3720 #[test]
3721 fn test_tokio_error_future() {
3722 let runner = tokio::Runner::default();
3723 test_error_future(runner);
3724 }
3725
3726 #[test]
3727 fn test_tokio_clock_sleep() {
3728 let executor = tokio::Runner::default();
3729 test_clock_sleep(executor);
3730 }
3731
3732 #[test]
3733 fn test_tokio_clock_sleep_until() {
3734 let executor = tokio::Runner::default();
3735 test_clock_sleep_until(executor);
3736 }
3737
3738 #[test]
3739 fn test_tokio_clock_sleep_until_far_future() {
3740 let executor = tokio::Runner::default();
3741 test_clock_sleep_until_far_future(executor);
3742 }
3743
3744 #[test]
3745 fn test_tokio_clock_timeout() {
3746 let executor = tokio::Runner::default();
3747 test_clock_timeout(executor);
3748 }
3749
3750 #[test]
3751 fn test_tokio_root_finishes() {
3752 let executor = tokio::Runner::default();
3753 test_root_finishes(executor);
3754 }
3755
3756 #[test]
3757 fn test_tokio_spawn_after_abort() {
3758 let executor = tokio::Runner::default();
3759 test_spawn_after_abort(executor);
3760 }
3761
3762 #[test]
3763 fn test_tokio_spawn_abort() {
3764 let executor = tokio::Runner::default();
3765 test_spawn_abort(executor, false, false);
3766 }
3767
3768 #[test]
3769 #[should_panic(expected = "blah")]
3770 fn test_tokio_panic_aborts_root() {
3771 let executor = tokio::Runner::default();
3772 test_panic_aborts_root(executor);
3773 }
3774
3775 #[test]
3776 #[should_panic(expected = "blah")]
3777 fn test_tokio_panic_aborts_root_caught() {
3778 let cfg = tokio::Config::default().with_catch_panics(true);
3779 let executor = tokio::Runner::new(cfg);
3780 test_panic_aborts_root(executor);
3781 }
3782
3783 #[test]
3784 #[should_panic(expected = "blah")]
3785 fn test_tokio_panic_aborts_spawn() {
3786 let executor = tokio::Runner::default();
3787 test_panic_aborts_spawn(executor);
3788 }
3789
3790 #[test]
3791 fn test_tokio_panic_aborts_spawn_caught() {
3792 let cfg = tokio::Config::default().with_catch_panics(true);
3793 let executor = tokio::Runner::new(cfg);
3794 test_panic_aborts_spawn_caught(executor);
3795 }
3796
3797 #[test]
3798 #[should_panic(expected = "boom")]
3799 fn test_tokio_multiple_panics() {
3800 let executor = tokio::Runner::default();
3801 test_multiple_panics(executor);
3802 }
3803
3804 #[test]
3805 fn test_tokio_multiple_panics_caught() {
3806 let cfg = tokio::Config::default().with_catch_panics(true);
3807 let executor = tokio::Runner::new(cfg);
3808 test_multiple_panics_caught(executor);
3809 }
3810
3811 #[test]
3812 fn test_tokio_select() {
3813 let executor = tokio::Runner::default();
3814 test_select(executor);
3815 }
3816
3817 #[test]
3818 fn test_tokio_select_loop() {
3819 let executor = tokio::Runner::default();
3820 test_select_loop(executor);
3821 }
3822
3823 #[test]
3824 fn test_tokio_storage_operations() {
3825 let executor = tokio::Runner::default();
3826 test_storage_operations(executor);
3827 }
3828
3829 #[test]
3830 fn test_tokio_blob_read_write() {
3831 let executor = tokio::Runner::default();
3832 test_blob_read_write(executor);
3833 }
3834
3835 #[test]
3836 fn test_tokio_blob_resize() {
3837 let executor = tokio::Runner::default();
3838 test_blob_resize(executor);
3839 }
3840
3841 #[test]
3842 fn test_tokio_many_partition_read_write() {
3843 let executor = tokio::Runner::default();
3844 test_many_partition_read_write(executor);
3845 }
3846
3847 #[test]
3848 fn test_tokio_blob_read_past_length() {
3849 let executor = tokio::Runner::default();
3850 test_blob_read_past_length(executor);
3851 }
3852
3853 #[test]
3854 fn test_tokio_blob_clone_and_concurrent_read() {
3855 let executor = tokio::Runner::default();
3857 test_blob_clone_and_concurrent_read(executor);
3858 }
3859
3860 #[test]
3861 fn test_tokio_shutdown() {
3862 let executor = tokio::Runner::default();
3863 test_shutdown(executor);
3864 }
3865
3866 #[test]
3867 fn test_tokio_shutdown_multiple_signals() {
3868 let executor = tokio::Runner::default();
3869 test_shutdown_multiple_signals(executor);
3870 }
3871
3872 #[test]
3873 fn test_tokio_shutdown_timeout() {
3874 let executor = tokio::Runner::default();
3875 test_shutdown_timeout(executor);
3876 }
3877
3878 #[test]
3879 fn test_tokio_shutdown_multiple_stop_calls() {
3880 let executor = tokio::Runner::default();
3881 test_shutdown_multiple_stop_calls(executor);
3882 }
3883
3884 #[test]
3885 fn test_tokio_unfulfilled_shutdown() {
3886 let executor = tokio::Runner::default();
3887 test_unfulfilled_shutdown(executor);
3888 }
3889
3890 #[test]
3891 fn test_tokio_spawn_dedicated() {
3892 let executor = tokio::Runner::default();
3893 test_spawn_dedicated(executor);
3894 }
3895
3896 #[test]
3897 fn test_tokio_spawn() {
3898 let runner = tokio::Runner::default();
3899 test_spawn(runner);
3900 }
3901
3902 #[test]
3903 fn test_tokio_spawn_abort_on_parent_abort() {
3904 let runner = tokio::Runner::default();
3905 test_spawn_abort_on_parent_abort(runner);
3906 }
3907
3908 #[test]
3909 fn test_tokio_spawn_abort_on_parent_completion() {
3910 let runner = tokio::Runner::default();
3911 test_spawn_abort_on_parent_completion(runner);
3912 }
3913
3914 #[test]
3915 fn test_tokio_spawn_cascading_abort() {
3916 let runner = tokio::Runner::default();
3917 test_spawn_cascading_abort(runner);
3918 }
3919
3920 #[test]
3921 fn test_tokio_child_survives_sibling_completion() {
3922 let runner = tokio::Runner::default();
3923 test_child_survives_sibling_completion(runner);
3924 }
3925
3926 #[test]
3927 fn test_tokio_spawn_clone_chain() {
3928 let runner = tokio::Runner::default();
3929 test_spawn_clone_chain(runner);
3930 }
3931
3932 #[test]
3933 fn test_tokio_spawn_sparse_clone_chain() {
3934 let runner = tokio::Runner::default();
3935 test_spawn_sparse_clone_chain(runner);
3936 }
3937
3938 #[test]
3939 fn test_tokio_spawn_blocking() {
3940 for dedicated in [false, true] {
3941 let executor = tokio::Runner::default();
3942 test_spawn_blocking(executor, dedicated);
3943 }
3944 }
3945
3946 #[test]
3947 #[should_panic(expected = "blocking task panicked")]
3948 fn test_tokio_spawn_blocking_panic() {
3949 for dedicated in [false, true] {
3950 let executor = tokio::Runner::default();
3951 test_spawn_blocking_panic(executor, dedicated);
3952 }
3953 }
3954
3955 #[test]
3956 fn test_tokio_spawn_blocking_panic_caught() {
3957 for dedicated in [false, true] {
3958 let cfg = tokio::Config::default().with_catch_panics(true);
3959 let executor = tokio::Runner::new(cfg);
3960 test_spawn_blocking_panic_caught(executor, dedicated);
3961 }
3962 }
3963
3964 #[test]
3965 fn test_tokio_spawn_blocking_abort() {
3966 for (dedicated, blocking) in [(false, true), (true, false)] {
3967 let executor = tokio::Runner::default();
3968 test_spawn_abort(executor, dedicated, blocking);
3969 }
3970 }
3971
3972 #[test]
3973 fn test_tokio_circular_reference_prevents_cleanup() {
3974 let executor = tokio::Runner::default();
3975 test_circular_reference_prevents_cleanup(executor);
3976 }
3977
3978 #[test]
3979 fn test_tokio_late_waker() {
3980 let executor = tokio::Runner::default();
3981 test_late_waker(executor);
3982 }
3983
3984 #[test]
3985 fn test_tokio_metrics() {
3986 let executor = tokio::Runner::default();
3987 test_metrics(executor);
3988 }
3989
3990 #[test]
3991 fn test_tokio_process_rss_metric() {
3992 let executor = tokio::Runner::default();
3993 executor.start(|context| async move {
3994 loop {
3995 let metrics = context.encode();
3997 if !metrics.contains("runtime_process_rss") {
3998 context.sleep(Duration::from_millis(100)).await;
3999 continue;
4000 }
4001
4002 for line in metrics.lines() {
4004 if line.starts_with("runtime_process_rss")
4005 && !line.starts_with("runtime_process_rss{")
4006 {
4007 let parts: Vec<&str> = line.split_whitespace().collect();
4008 if parts.len() >= 2 {
4009 let rss_value: i64 =
4010 parts[1].parse().expect("Failed to parse RSS value");
4011 if rss_value > 0 {
4012 return;
4013 }
4014 }
4015 }
4016 }
4017 }
4018 });
4019 }
4020
4021 #[test]
4022 fn test_tokio_telemetry() {
4023 let executor = tokio::Runner::default();
4024 executor.start(|context| async move {
4025 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
4027
4028 tokio::telemetry::init(
4030 context.with_label("metrics"),
4031 tokio::telemetry::Logging {
4032 level: Level::INFO,
4033 json: false,
4034 },
4035 Some(address),
4036 None,
4037 );
4038
4039 let counter: Counter<u64> = Counter::default();
4041 context.register("test_counter", "Test counter", counter.clone());
4042 counter.inc();
4043
4044 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
4046 let mut line = Vec::new();
4047 loop {
4048 let received = stream.recv(1).await?;
4049 let byte = received.coalesce().as_ref()[0];
4050 if byte == b'\n' {
4051 if line.last() == Some(&b'\r') {
4052 line.pop(); }
4054 break;
4055 }
4056 line.push(byte);
4057 }
4058 String::from_utf8(line).map_err(|_| Error::ReadFailed)
4059 }
4060
4061 async fn read_headers<St: Stream>(
4062 stream: &mut St,
4063 ) -> Result<HashMap<String, String>, Error> {
4064 let mut headers = HashMap::new();
4065 loop {
4066 let line = read_line(stream).await?;
4067 if line.is_empty() {
4068 break;
4069 }
4070 let parts: Vec<&str> = line.splitn(2, ": ").collect();
4071 if parts.len() == 2 {
4072 headers.insert(parts[0].to_string(), parts[1].to_string());
4073 }
4074 }
4075 Ok(headers)
4076 }
4077
4078 async fn read_body<St: Stream>(
4079 stream: &mut St,
4080 content_length: usize,
4081 ) -> Result<String, Error> {
4082 let received = stream.recv(content_length).await?;
4083 String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
4084 }
4085
4086 let client_handle = context
4088 .with_label("client")
4089 .spawn(move |context| async move {
4090 let (mut sink, mut stream) = loop {
4091 match context.dial(address).await {
4092 Ok((sink, stream)) => break (sink, stream),
4093 Err(e) => {
4094 error!(err =?e, "failed to connect");
4096 context.sleep(Duration::from_millis(10)).await;
4097 }
4098 }
4099 };
4100
4101 let request = format!(
4103 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
4104 );
4105 sink.send(Bytes::from(request)).await.unwrap();
4106
4107 let status_line = read_line(&mut stream).await.unwrap();
4109 assert_eq!(status_line, "HTTP/1.1 200 OK");
4110
4111 let headers = read_headers(&mut stream).await.unwrap();
4113 println!("Headers: {headers:?}");
4114 let content_length = headers
4115 .get("content-length")
4116 .unwrap()
4117 .parse::<usize>()
4118 .unwrap();
4119
4120 let body = read_body(&mut stream, content_length).await.unwrap();
4122 assert!(body.contains("test_counter_total 1"));
4123 });
4124
4125 client_handle.await.unwrap();
4127 });
4128 }
4129
4130 #[test]
4131 fn test_tokio_resolver() {
4132 let executor = tokio::Runner::default();
4133 executor.start(|context| async move {
4134 let addrs = context.resolve("localhost").await.unwrap();
4135 assert!(!addrs.is_empty());
4136 for addr in addrs {
4137 assert!(
4138 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
4139 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
4140 );
4141 }
4142 });
4143 }
4144
4145 #[test]
4146 fn test_create_thread_pool_tokio() {
4147 let executor = tokio::Runner::default();
4148 executor.start(|context| async move {
4149 let pool = context
4151 .with_label("pool")
4152 .create_thread_pool(NZUsize!(4))
4153 .unwrap();
4154
4155 let v: Vec<_> = (0..10000).collect();
4157
4158 pool.install(|| {
4160 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
4161 });
4162 });
4163 }
4164
4165 #[test]
4166 fn test_create_thread_pool_deterministic() {
4167 let executor = deterministic::Runner::default();
4168 executor.start(|context| async move {
4169 let pool = context
4171 .with_label("pool")
4172 .create_thread_pool(NZUsize!(4))
4173 .unwrap();
4174
4175 let v: Vec<_> = (0..10000).collect();
4177
4178 pool.install(|| {
4180 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
4181 });
4182 });
4183 }
4184
4185 fn test_buffer_pooler<R: Runner>(
4186 runner: R,
4187 expected_network_max_per_class: usize,
4188 expected_storage_max_per_class: usize,
4189 ) where
4190 R::Context: BufferPooler,
4191 {
4192 runner.start(|context| async move {
4193 let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
4195 assert!(net_buf.capacity() >= 1024);
4196
4197 let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
4199 assert!(storage_buf.capacity() >= 4096);
4200
4201 assert_eq!(
4203 context.network_buffer_pool().config().max_per_class.get(),
4204 expected_network_max_per_class
4205 );
4206 assert_eq!(
4207 context.storage_buffer_pool().config().max_per_class.get(),
4208 expected_storage_max_per_class
4209 );
4210 });
4211 }
4212
4213 #[test]
4214 fn test_deterministic_buffer_pooler() {
4215 test_buffer_pooler(deterministic::Runner::default(), 4096, 64);
4216
4217 let runner = deterministic::Runner::new(
4218 deterministic::Config::default()
4219 .with_network_buffer_pool_config(
4220 BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
4221 )
4222 .with_storage_buffer_pool_config(
4223 BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
4224 ),
4225 );
4226 test_buffer_pooler(runner, 64, 8);
4227 }
4228
4229 #[test]
4230 fn test_tokio_buffer_pooler() {
4231 test_buffer_pooler(tokio::Runner::default(), 4096, 64);
4232
4233 let runner = tokio::Runner::new(
4234 tokio::Config::default()
4235 .with_network_buffer_pool_config(
4236 BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
4237 )
4238 .with_storage_buffer_pool_config(
4239 BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
4240 ),
4241 );
4242 test_buffer_pooler(runner, 64, 8);
4243 }
4244}