1#![doc(
20 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21 html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34 pub mod deterministic;
35 pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38 pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41 mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44 pub mod tokio;
45});
46stability_scope!(BETA {
47 pub use bytes::{Buf, BufMut};
49 use commonware_macros::select;
50 use commonware_parallel::{Rayon, ThreadPool};
51 pub use governor::Quota;
53 use iobuf::PoolError;
54 use rayon::ThreadPoolBuildError;
55 use std::{
56 future::Future,
57 io::Error as IoError,
58 net::SocketAddr,
59 num::NonZeroUsize,
60 time::{Duration, SystemTime},
61 };
62 pub(crate) use telemetry::metrics::{child_label, prefixed_name, METRICS_PREFIX};
63 use thiserror::Error;
64
65 pub mod iobuf;
66 pub use iobuf::{
67 cache_line_size, page_size, BufferPool, BufferPoolConfig, BufferPoolThreadCache,
68 Builder as IoBufsBuilder, IoBuf, IoBufMut, IoBufs, IoBufsMut,
69 };
70
71 pub mod utils;
72 pub use utils::*;
73
74 pub mod telemetry;
75
76 pub const DEFAULT_BLOB_VERSION: u16 = 0;
78
79 #[derive(Error, Debug)]
81 pub enum Error {
82 #[error("exited")]
83 Exited,
84 #[error("closed")]
85 Closed,
86 #[error("timeout")]
87 Timeout,
88 #[error("bind failed")]
89 BindFailed,
90 #[error("connection failed")]
91 ConnectionFailed,
92 #[error("write failed")]
93 WriteFailed,
94 #[error("read failed")]
95 ReadFailed,
96 #[error("send failed")]
97 SendFailed,
98 #[error("recv failed")]
99 RecvFailed,
100 #[error("dns resolution failed: {0}")]
101 ResolveFailed(String),
102 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
103 PartitionNameInvalid(String),
104 #[error("partition creation failed: {0}")]
105 PartitionCreationFailed(String),
106 #[error("partition missing: {0}")]
107 PartitionMissing(String),
108 #[error("partition corrupt: {0}")]
109 PartitionCorrupt(String),
110 #[error("blob open failed: {0}/{1} error: {2}")]
111 BlobOpenFailed(String, String, IoError),
112 #[error("blob missing: {0}/{1}")]
113 BlobMissing(String, String),
114 #[error("blob resize failed: {0}/{1} error: {2}")]
115 BlobResizeFailed(String, String, IoError),
116 #[error("blob sync failed: {0}/{1} error: {2}")]
117 BlobSyncFailed(String, String, IoError),
118 #[error("blob insufficient length")]
119 BlobInsufficientLength,
120 #[error("blob corrupt: {0}/{1} reason: {2}")]
121 BlobCorrupt(String, String, String),
122 #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
123 BlobVersionMismatch {
124 expected: std::ops::RangeInclusive<u16>,
125 found: u16,
126 },
127 #[error("invalid or missing checksum")]
128 InvalidChecksum,
129 #[error("offset overflow")]
130 OffsetOverflow,
131 #[error("io error: {0}")]
132 Io(#[from] IoError),
133 #[error("buffer pool: {0}")]
134 Pool(#[from] PoolError),
135 }
136
137 pub trait Runner {
140 type Context;
142
143 fn start<F, Fut>(self, f: F) -> Fut::Output
149 where
150 F: FnOnce(Self::Context) -> Fut,
151 Fut: Future;
152 }
153
154 #[derive(Clone, Debug, Default)]
156 pub struct Name {
157 pub label: String,
159 pub attributes: Vec<(String, String)>,
161 }
162
163 pub trait Supervisor: Send + Sync + 'static {
165 fn name(&self) -> Name;
167
168 #[must_use]
178 fn child(&self, label: &'static str) -> Self;
179
180 #[must_use]
245 fn with_attribute(self, key: &'static str, value: impl std::fmt::Display) -> Self;
246 }
247
248 pub trait Spawner: Supervisor {
250 #[must_use]
259 fn shared(self, blocking: bool) -> Self;
260
261 #[must_use]
268 fn dedicated(self) -> Self;
269
270 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
305 where
306 Self: Sized,
307 F: FnOnce(Self) -> Fut + Send + 'static,
308 Fut: Future<Output = T> + Send + 'static,
309 T: Send + 'static;
310
311 fn stop(
331 self,
332 value: i32,
333 timeout: Option<Duration>,
334 ) -> impl Future<Output = Result<(), Error>> + Send;
335
336 fn stopped(&self) -> signal::Signal;
343 }
344
345 pub trait ThreadPooler: Spawner {
348 fn create_thread_pool(
357 &self,
358 concurrency: NonZeroUsize,
359 ) -> Result<ThreadPool, ThreadPoolBuildError>;
360
361 fn create_strategy(
370 &self,
371 concurrency: NonZeroUsize,
372 ) -> Result<Rayon, ThreadPoolBuildError> {
373 self.create_thread_pool(concurrency).map(Rayon::with_pool)
374 }
375 }
376
377 pub trait Tracing: Supervisor {
379 #[must_use]
405 fn with_span(self) -> Self;
406 }
407
408 pub trait Metrics: Supervisor {
410 fn register<N: Into<String>, H: Into<String>, M: telemetry::metrics::Metric>(
427 &self,
428 name: N,
429 help: H,
430 metric: M,
431 ) -> telemetry::metrics::Registered<M>;
432
433 fn encode(&self) -> String;
435 }
436
437 pub trait Observer: Tracing + Metrics {}
461
462 pub type RateLimiter<C> = governor::RateLimiter<
467 governor::state::NotKeyed,
468 governor::state::InMemoryState,
469 C,
470 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
471 >;
472
473 pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
480 K,
481 governor::state::keyed::HashMapStateStore<K>,
482 C,
483 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
484 >;
485
486 pub trait Clock:
492 governor::clock::Clock<Instant = SystemTime>
493 + governor::clock::ReasonablyRealtime
494 + Send
495 + Sync
496 + 'static
497 {
498 fn current(&self) -> SystemTime;
500
501 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
503
504 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
506
507 fn timeout<F, T>(
528 &self,
529 duration: Duration,
530 future: F,
531 ) -> impl Future<Output = Result<T, Error>> + Send + '_
532 where
533 F: Future<Output = T> + Send + 'static,
534 T: Send + 'static,
535 {
536 async move {
537 select! {
538 result = future => Ok(result),
539 _ = self.sleep(duration) => Err(Error::Timeout),
540 }
541 }
542 }
543 }
544
545 pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
547
548 pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
550
551 pub type ListenerOf<N> = <N as crate::Network>::Listener;
553
554 pub trait Network: Send + Sync + 'static {
557 type Listener: Listener;
561
562 fn bind(
564 &self,
565 socket: SocketAddr,
566 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
567
568 fn dial(
570 &self,
571 socket: SocketAddr,
572 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
573 }
574
575 pub trait Resolver: Send + Sync + 'static {
577 fn resolve(
581 &self,
582 host: &str,
583 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
584 }
585
586 pub trait Listener: Sync + Send + 'static {
589 type Sink: Sink;
592 type Stream: Stream;
595
596 fn accept(
598 &mut self,
599 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
600
601 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
603 }
604
605 pub trait Sink: Sync + Send + 'static {
608 fn send(
619 &mut self,
620 bufs: impl Into<IoBufs> + Send,
621 ) -> impl Future<Output = Result<(), Error>> + Send;
622 }
623
624 pub trait Stream: Sync + Send + 'static {
627 fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
640
641 fn peek(&self, max_len: usize) -> &[u8];
649 }
650
651 pub trait Storage: Send + Sync + 'static {
664 type Blob: Blob;
666
667 fn open(
670 &self,
671 partition: &str,
672 name: &[u8],
673 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
674 async move {
675 let (blob, size, _) = self
676 .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
677 .await?;
678 Ok((blob, size))
679 }
680 }
681
682 fn open_versioned(
699 &self,
700 partition: &str,
701 name: &[u8],
702 versions: std::ops::RangeInclusive<u16>,
703 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
704
705 fn remove(
711 &self,
712 partition: &str,
713 name: Option<&[u8]>,
714 ) -> impl Future<Output = Result<(), Error>> + Send;
715
716 fn scan(&self, partition: &str)
718 -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
719 }
720
721 #[allow(clippy::len_without_is_empty)]
736 pub trait Blob: Clone + Send + Sync + 'static {
737 fn read_at_buf(
753 &self,
754 offset: u64,
755 len: usize,
756 bufs: impl Into<IoBufsMut> + Send,
757 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
758
759 fn read_at(
764 &self,
765 offset: u64,
766 len: usize,
767 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
768
769 fn write_at(
771 &self,
772 offset: u64,
773 bufs: impl Into<IoBufs> + Send,
774 ) -> impl Future<Output = Result<(), Error>> + Send;
775
776 fn write_at_sync(
783 &self,
784 offset: u64,
785 bufs: impl Into<IoBufs> + Send,
786 ) -> impl Future<Output = Result<(), Error>> + Send;
787
788 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
793
794 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
796 }
797
798 pub trait BufferPooler: Send + Sync + 'static {
800 fn network_buffer_pool(&self) -> &BufferPool;
802
803 fn storage_buffer_pool(&self) -> &BufferPool;
805 }
806});
807stability_scope!(BETA, cfg(feature = "external") {
808 pub trait Pacer: Clock + Send + Sync + 'static {
810 fn pace<'a, F, T>(
830 &'a self,
831 latency: Duration,
832 future: F,
833 ) -> impl Future<Output = T> + Send + 'a
834 where
835 F: Future<Output = T> + Send + 'a,
836 T: Send + 'a;
837 }
838
839 pub trait FutureExt: Future + Send + Sized {
844 fn pace<'a, E>(
846 self,
847 pacer: &'a E,
848 latency: Duration,
849 ) -> impl Future<Output = Self::Output> + Send + 'a
850 where
851 E: Pacer + 'a,
852 Self: Send + 'a,
853 Self::Output: Send + 'a,
854 {
855 pacer.pace(latency, self)
856 }
857 }
858
859 impl<F> FutureExt for F where F: Future + Send {}
860});
861
862#[cfg(test)]
863mod tests {
864 use super::*;
865 use crate::telemetry::{
866 metrics::{
867 count_running_tasks,
868 raw::{Counter, Family},
869 EncodeLabelKey, EncodeLabelSetTrait as EncodeLabelSet,
870 EncodeLabelValueTrait as EncodeLabelValue, LabelSetEncoder,
871 },
872 traces::collector::TraceStorage,
873 };
874 use bytes::Bytes;
875 use commonware_macros::{select, test_collect_traces};
876 use commonware_utils::{
877 channel::{mpsc, oneshot},
878 sync::Mutex,
879 NZUsize, SystemTimeExt, NZU32,
880 };
881 use futures::{
882 future::{pending, ready},
883 join, pin_mut, FutureExt,
884 };
885 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
886 use std::{
887 collections::HashMap,
888 net::{IpAddr, Ipv4Addr, Ipv6Addr},
889 pin::Pin,
890 str::FromStr,
891 sync::{
892 atomic::{AtomicU32, Ordering},
893 Arc,
894 },
895 task::{Context as TContext, Poll, Waker},
896 };
897 use tracing::{error, Level};
898 use utils::reschedule;
899
900 fn test_error_future<R: Runner>(runner: R) {
901 #[allow(clippy::unused_async)]
902 async fn error_future() -> Result<&'static str, &'static str> {
903 Err("An error occurred")
904 }
905 let result = runner.start(|_| error_future());
906 assert_eq!(result, Err("An error occurred"));
907 }
908
909 fn test_clock_sleep<R: Runner>(runner: R)
910 where
911 R::Context: Spawner + Clock,
912 {
913 runner.start(|context| async move {
914 let start = context.current();
916 let sleep_duration = Duration::from_millis(10);
917 context.sleep(sleep_duration).await;
918
919 let end = context.current();
921 assert!(end.duration_since(start).unwrap() >= sleep_duration);
922 });
923 }
924
925 fn test_clock_sleep_until<R: Runner>(runner: R)
926 where
927 R::Context: Spawner + Clock + Metrics,
928 {
929 runner.start(|context| async move {
930 let now = context.current();
932 context.sleep_until(now + Duration::from_millis(100)).await;
933
934 let elapsed = now.elapsed().unwrap();
936 assert!(elapsed >= Duration::from_millis(100));
937 });
938 }
939
940 fn test_clock_sleep_until_far_future<R: Runner>(runner: R)
941 where
942 R::Context: Spawner + Clock,
943 {
944 runner.start(|context| async move {
945 let sleep = context.sleep_until(SystemTime::limit());
946 let result = context.timeout(Duration::from_millis(1), sleep).await;
947 assert!(matches!(result, Err(Error::Timeout)));
948 });
949 }
950
951 fn test_clock_timeout<R: Runner>(runner: R)
952 where
953 R::Context: Spawner + Clock,
954 {
955 runner.start(|context| async move {
956 let result = context
958 .timeout(Duration::from_millis(100), async { "success" })
959 .await;
960 assert_eq!(result.unwrap(), "success");
961
962 let result = context
964 .timeout(Duration::from_millis(50), pending::<()>())
965 .await;
966 assert!(matches!(result, Err(Error::Timeout)));
967
968 let result = context
970 .timeout(
971 Duration::from_millis(100),
972 context.sleep(Duration::from_millis(50)),
973 )
974 .await;
975 assert!(result.is_ok());
976 });
977 }
978
979 fn test_root_finishes<R: Runner>(runner: R)
980 where
981 R::Context: Spawner,
982 {
983 runner.start(|context| async move {
984 context.spawn(|_| async move {
985 loop {
986 reschedule().await;
987 }
988 });
989 });
990 }
991
992 fn test_spawn_after_abort<R>(runner: R)
993 where
994 R: Runner,
995 R::Context: Spawner,
996 {
997 runner.start(|context| async move {
998 let child = context.child("child");
1000
1001 let parent_handle = context.spawn(move |_| async move {
1003 pending::<()>().await;
1004 });
1005 parent_handle.abort();
1006
1007 let child_handle = child.spawn(move |_| async move {
1009 pending::<()>().await;
1010 });
1011 assert!(matches!(child_handle.await, Err(Error::Closed)));
1012 });
1013 }
1014
1015 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
1016 where
1017 R::Context: Spawner,
1018 {
1019 runner.start(|context| async move {
1020 let context = if dedicated {
1021 assert!(!blocking);
1022 context.dedicated()
1023 } else {
1024 context.shared(blocking)
1025 };
1026
1027 let handle = context.spawn(|_| async move {
1028 loop {
1029 reschedule().await;
1030 }
1031 });
1032 handle.abort();
1033 assert!(matches!(handle.await, Err(Error::Closed)));
1034 });
1035 }
1036
1037 fn test_panic_aborts_root<R: Runner>(runner: R) {
1038 let result: Result<(), Error> = runner.start(|_| async move {
1039 panic!("blah");
1040 });
1041 result.unwrap_err();
1042 }
1043
1044 fn test_panic_aborts_spawn<R: Runner>(runner: R)
1045 where
1046 R::Context: Spawner + Clock,
1047 {
1048 runner.start(|context| async move {
1049 context.child("panic").spawn(|_| async move {
1050 panic!("blah");
1051 });
1052
1053 loop {
1055 context.sleep(Duration::from_millis(100)).await;
1056 }
1057 });
1058 }
1059
1060 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1061 where
1062 R::Context: Spawner + Clock,
1063 {
1064 let result: Result<(), Error> = runner.start(|context| async move {
1065 let result = context.child("panic").spawn(|_| async move {
1066 panic!("blah");
1067 });
1068 result.await
1069 });
1070 assert!(matches!(result, Err(Error::Exited)));
1071 }
1072
1073 fn test_multiple_panics<R: Runner>(runner: R)
1074 where
1075 R::Context: Spawner + Clock,
1076 {
1077 runner.start(|context| async move {
1078 context.child("panic").spawn(|_| async move {
1079 panic!("boom 1");
1080 });
1081 context.child("panic").spawn(|_| async move {
1082 panic!("boom 2");
1083 });
1084 context.child("panic").spawn(|_| async move {
1085 panic!("boom 3");
1086 });
1087
1088 loop {
1090 context.sleep(Duration::from_millis(100)).await;
1091 }
1092 });
1093 }
1094
1095 fn test_multiple_panics_caught<R: Runner>(runner: R)
1096 where
1097 R::Context: Spawner + Clock,
1098 {
1099 let (res1, res2, res3) = runner.start(|context| async move {
1100 let handle1 = context.child("panic").spawn(|_| async move {
1101 panic!("boom 1");
1102 });
1103 let handle2 = context.child("panic").spawn(|_| async move {
1104 panic!("boom 2");
1105 });
1106 let handle3 = context.child("panic").spawn(|_| async move {
1107 panic!("boom 3");
1108 });
1109
1110 join!(handle1, handle2, handle3)
1111 });
1112 assert!(matches!(res1, Err(Error::Exited)));
1113 assert!(matches!(res2, Err(Error::Exited)));
1114 assert!(matches!(res3, Err(Error::Exited)));
1115 }
1116
1117 fn test_select<R: Runner>(runner: R) {
1118 runner.start(|_| async move {
1119 let output = Mutex::new(0);
1121 select! {
1122 v1 = ready(1) => {
1123 *output.lock() = v1;
1124 },
1125 v2 = ready(2) => {
1126 *output.lock() = v2;
1127 },
1128 };
1129 assert_eq!(*output.lock(), 1);
1130
1131 select! {
1133 v1 = std::future::pending::<i32>() => {
1134 *output.lock() = v1;
1135 },
1136 v2 = ready(2) => {
1137 *output.lock() = v2;
1138 },
1139 };
1140 assert_eq!(*output.lock(), 2);
1141 });
1142 }
1143
1144 fn test_select_loop<R: Runner>(runner: R)
1146 where
1147 R::Context: Clock,
1148 {
1149 runner.start(|context| async move {
1150 let (sender, mut receiver) = mpsc::unbounded_channel();
1152 for _ in 0..2 {
1153 select! {
1154 v = receiver.recv() => {
1155 panic!("unexpected value: {v:?}");
1156 },
1157 _ = context.sleep(Duration::from_millis(100)) => {
1158 continue;
1159 },
1160 };
1161 }
1162
1163 sender.send(0).unwrap();
1165 sender.send(1).unwrap();
1166
1167 select! {
1169 _ = async {} => {
1170 },
1172 v = receiver.recv() => {
1173 panic!("unexpected value: {v:?}");
1174 },
1175 };
1176
1177 for i in 0..2 {
1179 select! {
1180 _ = context.sleep(Duration::from_millis(100)) => {
1181 panic!("timeout");
1182 },
1183 v = receiver.recv() => {
1184 assert_eq!(v.unwrap(), i);
1185 },
1186 };
1187 }
1188 });
1189 }
1190
1191 fn test_storage_operations<R: Runner>(runner: R)
1192 where
1193 R::Context: Storage,
1194 {
1195 runner.start(|context| async move {
1196 let partition = "test_partition";
1197 let name = b"test_blob";
1198
1199 let (blob, size) = context
1201 .open(partition, name)
1202 .await
1203 .expect("Failed to open blob");
1204 assert_eq!(size, 0, "new blob should have size 0");
1205
1206 let data = b"Hello, Storage!";
1208 blob.write_at(0, data)
1209 .await
1210 .expect("Failed to write to blob");
1211
1212 blob.sync().await.expect("Failed to sync blob");
1214
1215 let read = blob
1217 .read_at(0, data.len())
1218 .await
1219 .expect("Failed to read from blob");
1220 assert_eq!(read.coalesce(), data);
1221
1222 blob.sync().await.expect("Failed to sync blob");
1224
1225 let blobs = context
1227 .scan(partition)
1228 .await
1229 .expect("Failed to scan partition");
1230 assert!(blobs.contains(&name.to_vec()));
1231
1232 let (blob, len) = context
1234 .open(partition, name)
1235 .await
1236 .expect("Failed to reopen blob");
1237 assert_eq!(len, data.len() as u64);
1238
1239 let read = blob.read_at(7, 7).await.expect("Failed to read data");
1241 assert_eq!(read.coalesce(), b"Storage");
1242
1243 blob.sync().await.expect("Failed to sync blob");
1245
1246 context
1248 .remove(partition, Some(name))
1249 .await
1250 .expect("Failed to remove blob");
1251
1252 let blobs = context
1254 .scan(partition)
1255 .await
1256 .expect("Failed to scan partition");
1257 assert!(!blobs.contains(&name.to_vec()));
1258
1259 context
1261 .remove(partition, None)
1262 .await
1263 .expect("Failed to remove partition");
1264
1265 let result = context.scan(partition).await;
1267 assert!(matches!(result, Err(Error::PartitionMissing(_))));
1268 });
1269 }
1270
1271 fn test_blob_read_write<R: Runner>(runner: R)
1272 where
1273 R::Context: Storage,
1274 {
1275 runner.start(|context| async move {
1276 let partition = "test_partition";
1277 let name = b"test_blob_rw";
1278
1279 let (blob, _) = context
1281 .open(partition, name)
1282 .await
1283 .expect("Failed to open blob");
1284
1285 let data1 = b"Hello";
1287 let data2 = b"World";
1288 blob.write_at(0, data1)
1289 .await
1290 .expect("Failed to write data1");
1291 blob.write_at(5, data2)
1292 .await
1293 .expect("Failed to write data2");
1294
1295 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1297 let read = read.coalesce();
1298 assert_eq!(&read.as_ref()[..5], data1);
1299 assert_eq!(&read.as_ref()[5..], data2);
1300
1301 let result = blob.read_at(10, 10).await;
1303 assert!(result.is_err());
1304
1305 let data3 = b"Store";
1307 blob.write_at(5, data3)
1308 .await
1309 .expect("Failed to write data3");
1310
1311 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1313 let read = read.coalesce();
1314 assert_eq!(&read.as_ref()[..5], data1);
1315 assert_eq!(&read.as_ref()[5..], data3);
1316
1317 let result = blob.read_at(10, 10).await;
1319 assert!(result.is_err());
1320 });
1321 }
1322
1323 fn test_blob_resize<R: Runner>(runner: R)
1324 where
1325 R::Context: Storage,
1326 {
1327 runner.start(|context| async move {
1328 let partition = "test_partition_resize";
1329 let name = b"test_blob_resize";
1330
1331 let (blob, _) = context
1333 .open(partition, name)
1334 .await
1335 .expect("Failed to open blob");
1336
1337 let data = b"some data";
1338 blob.write_at(0, data.to_vec())
1339 .await
1340 .expect("Failed to write");
1341 blob.sync().await.expect("Failed to sync after write");
1342
1343 let (blob, len) = context.open(partition, name).await.unwrap();
1345 assert_eq!(len, data.len() as u64);
1346
1347 let new_len = (data.len() as u64) * 2;
1349 blob.resize(new_len)
1350 .await
1351 .expect("Failed to resize to extend");
1352 blob.sync().await.expect("Failed to sync after resize");
1353
1354 let (blob, len) = context.open(partition, name).await.unwrap();
1356 assert_eq!(len, new_len);
1357
1358 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1360 assert_eq!(read_buf.coalesce(), data);
1361
1362 let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1364 assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1365
1366 blob.resize(data.len() as u64).await.unwrap();
1368 blob.sync().await.unwrap();
1369
1370 let (blob, size) = context.open(partition, name).await.unwrap();
1372 assert_eq!(size, data.len() as u64);
1373
1374 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1376 assert_eq!(read_buf.coalesce(), data);
1377 blob.sync().await.unwrap();
1378 });
1379 }
1380
1381 fn test_many_partition_read_write<R: Runner>(runner: R)
1382 where
1383 R::Context: Storage,
1384 {
1385 runner.start(|context| async move {
1386 let partitions = ["partition1", "partition2", "partition3"];
1387 let name = b"test_blob_rw";
1388 let data1 = b"Hello";
1389 let data2 = b"World";
1390
1391 for (additional, partition) in partitions.iter().enumerate() {
1392 let (blob, _) = context
1394 .open(partition, name)
1395 .await
1396 .expect("Failed to open blob");
1397
1398 blob.write_at(0, data1)
1400 .await
1401 .expect("Failed to write data1");
1402 blob.write_at(5 + additional as u64, data2)
1403 .await
1404 .expect("Failed to write data2");
1405
1406 blob.sync().await.expect("Failed to sync blob");
1408 }
1409
1410 for (additional, partition) in partitions.iter().enumerate() {
1411 let (blob, len) = context
1413 .open(partition, name)
1414 .await
1415 .expect("Failed to open blob");
1416 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1417
1418 let read = blob
1420 .read_at(0, 10 + additional)
1421 .await
1422 .expect("Failed to read data");
1423 let read = read.coalesce();
1424 assert_eq!(&read.as_ref()[..5], b"Hello");
1425 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1426 }
1427 });
1428 }
1429
1430 fn test_blob_read_past_length<R: Runner>(runner: R)
1431 where
1432 R::Context: Storage,
1433 {
1434 runner.start(|context| async move {
1435 let partition = "test_partition";
1436 let name = b"test_blob_rw";
1437
1438 let (blob, _) = context
1440 .open(partition, name)
1441 .await
1442 .expect("Failed to open blob");
1443
1444 let result = blob.read_at(0, 10).await;
1446 assert!(result.is_err());
1447
1448 let data = b"Hello, Storage!".to_vec();
1450 blob.write_at(0, data)
1451 .await
1452 .expect("Failed to write to blob");
1453
1454 let result = blob.read_at(0, 20).await;
1456 assert!(result.is_err());
1457 })
1458 }
1459
1460 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1461 where
1462 R::Context: Spawner + Storage + Metrics,
1463 {
1464 runner.start(|context| async move {
1465 let partition = "test_partition";
1466 let name = b"test_blob_rw";
1467
1468 let (blob, _) = context
1470 .open(partition, name)
1471 .await
1472 .expect("Failed to open blob");
1473
1474 let data = b"Hello, Storage!";
1476 blob.write_at(0, data)
1477 .await
1478 .expect("Failed to write to blob");
1479
1480 blob.sync().await.expect("Failed to sync blob");
1482
1483 let check1 = context.child("check1").spawn({
1485 let blob = blob.clone();
1486 let data_len = data.len();
1487 move |_| async move {
1488 let read = blob
1489 .read_at(0, data_len)
1490 .await
1491 .expect("Failed to read from blob");
1492 assert_eq!(read.coalesce(), data);
1493 }
1494 });
1495 let check2 = context.child("check2").spawn({
1496 let blob = blob.clone();
1497 let data_len = data.len();
1498 move |_| async move {
1499 let read = blob
1500 .read_at(0, data_len)
1501 .await
1502 .expect("Failed to read from blob");
1503 assert_eq!(read.coalesce(), data);
1504 }
1505 });
1506
1507 let result = join!(check1, check2);
1509 assert!(result.0.is_ok());
1510 assert!(result.1.is_ok());
1511
1512 let read = blob
1514 .read_at(0, data.len())
1515 .await
1516 .expect("Failed to read from blob");
1517 assert_eq!(read.coalesce(), data);
1518
1519 drop(blob);
1521
1522 let buffer = context.encode();
1524 assert!(buffer.contains("open_blobs 0"));
1525 });
1526 }
1527
1528 fn test_shutdown<R: Runner>(runner: R)
1529 where
1530 R::Context: Spawner + Metrics + Clock,
1531 {
1532 let kill = 9;
1533 runner.start(|context| async move {
1534 let before = context.child("before").spawn(move |context| async move {
1536 let mut signal = context.stopped();
1537 let value = (&mut signal).await.unwrap();
1538 assert_eq!(value, kill);
1539 drop(signal);
1540 });
1541
1542 let result = context.child("stop").stop(kill, None).await;
1544 assert!(result.is_ok());
1545
1546 let after = context.child("after").spawn(move |context| async move {
1548 let value = context.stopped().await.unwrap();
1550 assert_eq!(value, kill);
1551 });
1552
1553 let result = join!(before, after);
1555 assert!(result.0.is_ok());
1556 assert!(result.1.is_ok());
1557 });
1558 }
1559
1560 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1561 where
1562 R::Context: Spawner + Metrics + Clock,
1563 {
1564 let kill = 42;
1565 runner.start(|context| async move {
1566 let (started_tx, mut started_rx) = mpsc::channel(3);
1567 let counter = Arc::new(AtomicU32::new(0));
1568
1569 let task = |context: R::Context, cleanup_duration: Duration| {
1572 let counter = counter.clone();
1573 let started_tx = started_tx.clone();
1574 context.spawn(move |context| async move {
1575 let mut signal = context.stopped();
1577 started_tx.send(()).await.unwrap();
1578
1579 let value = (&mut signal).await.unwrap();
1581 assert_eq!(value, kill);
1582 context.sleep(cleanup_duration).await;
1583 counter.fetch_add(1, Ordering::SeqCst);
1584
1585 drop(signal);
1587 })
1588 };
1589
1590 let task1 = task(context.child("cleanup"), Duration::from_millis(10));
1591 let task2 = task(context.child("cleanup"), Duration::from_millis(20));
1592 let task3 = task(context.child("cleanup"), Duration::from_millis(30));
1593
1594 for _ in 0..3 {
1596 started_rx.recv().await.unwrap();
1597 }
1598
1599 context.stop(kill, None).await.unwrap();
1601 assert_eq!(counter.load(Ordering::SeqCst), 3);
1602
1603 let result = join!(task1, task2, task3);
1605 assert!(result.0.is_ok());
1606 assert!(result.1.is_ok());
1607 assert!(result.2.is_ok());
1608 });
1609 }
1610
1611 fn test_shutdown_timeout<R: Runner>(runner: R)
1612 where
1613 R::Context: Spawner + Metrics + Clock,
1614 {
1615 let kill = 42;
1616 runner.start(|context| async move {
1617 let (started_tx, started_rx) = oneshot::channel();
1619
1620 context.child("signal").spawn(move |context| async move {
1622 let signal = context.stopped();
1623 started_tx.send(()).unwrap();
1624 pending::<()>().await;
1625 signal.await.unwrap();
1626 });
1627
1628 started_rx.await.unwrap();
1630 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1631
1632 assert!(matches!(result, Err(Error::Timeout)));
1634 });
1635 }
1636
1637 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1638 where
1639 R::Context: Spawner + Metrics + Clock,
1640 {
1641 let kill1 = 42;
1642 let kill2 = 43;
1643
1644 runner.start(|context| async move {
1645 let (started_tx, started_rx) = oneshot::channel();
1646 let counter = Arc::new(AtomicU32::new(0));
1647
1648 let task = context.child("blocking_task").spawn({
1650 let counter = counter.clone();
1651 move |context| async move {
1652 let mut signal = context.stopped();
1654 started_tx.send(()).unwrap();
1655
1656 let value = (&mut signal).await.unwrap();
1658 assert_eq!(value, kill1);
1659 context.sleep(Duration::from_millis(50)).await;
1660
1661 counter.fetch_add(1, Ordering::SeqCst);
1663 drop(signal);
1664 }
1665 });
1666
1667 started_rx.await.unwrap();
1669
1670 let stop_task1 = context.child("stop").stop(kill1, None);
1673 pin_mut!(stop_task1);
1674 let stop_task2 = context.child("stop").stop(kill2, None);
1675 pin_mut!(stop_task2);
1676
1677 assert!(stop_task1.as_mut().now_or_never().is_none());
1679 assert!(stop_task2.as_mut().now_or_never().is_none());
1680
1681 assert!(stop_task1.await.is_ok());
1683 assert!(stop_task2.await.is_ok());
1684
1685 let sig = context.stopped().await;
1687 assert_eq!(sig.unwrap(), kill1);
1688
1689 let result = task.await;
1691 assert!(result.is_ok());
1692 assert_eq!(counter.load(Ordering::SeqCst), 1);
1693
1694 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1696 });
1697 }
1698
1699 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1700 where
1701 R::Context: Spawner + Metrics,
1702 {
1703 runner.start(|context| async move {
1704 context.child("before").spawn(move |context| async move {
1706 let mut signal = context.stopped();
1707 let value = (&mut signal).await.unwrap();
1708
1709 assert_eq!(value, 42);
1711 drop(signal);
1712 });
1713
1714 reschedule().await;
1716 });
1717 }
1718
1719 fn test_spawn_dedicated<R: Runner>(runner: R)
1720 where
1721 R::Context: Spawner,
1722 {
1723 runner.start(|context| async move {
1724 let handle = context.dedicated().spawn(|_| async move { 42 });
1725 assert!(matches!(handle.await, Ok(42)));
1726 });
1727 }
1728
1729 fn test_spawn<R: Runner>(runner: R)
1730 where
1731 R::Context: Spawner + Clock,
1732 {
1733 runner.start(|context| async move {
1734 let child_handle = Arc::new(Mutex::new(None));
1735 let child_handle2 = child_handle.clone();
1736
1737 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1738 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1739 let parent_handle = context.spawn(move |context| async move {
1740 let handle = context.spawn(|_| async {});
1742
1743 *child_handle2.lock() = Some(handle);
1745
1746 parent_initialized_tx.send(()).unwrap();
1747
1748 parent_complete_rx.await.unwrap();
1750 });
1751
1752 parent_initialized_rx.await.unwrap();
1754
1755 let child_handle = child_handle.lock().take().unwrap();
1757 assert!(child_handle.await.is_ok());
1758
1759 parent_complete_tx.send(()).unwrap();
1761
1762 assert!(parent_handle.await.is_ok());
1764 });
1765 }
1766
1767 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1768 where
1769 R::Context: Spawner + Clock,
1770 {
1771 runner.start(|context| async move {
1772 let child_handle = Arc::new(Mutex::new(None));
1773 let child_handle2 = child_handle.clone();
1774
1775 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1776 let parent_handle = context.spawn(move |context| async move {
1777 let handle = context.spawn(|_| pending::<()>());
1779
1780 *child_handle2.lock() = Some(handle);
1782
1783 parent_initialized_tx.send(()).unwrap();
1784
1785 pending::<()>().await
1787 });
1788
1789 parent_initialized_rx.await.unwrap();
1791
1792 parent_handle.abort();
1794 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1795
1796 let child_handle = child_handle.lock().take().unwrap();
1798 assert!(matches!(child_handle.await, Err(Error::Closed)));
1799 });
1800 }
1801
1802 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1803 where
1804 R::Context: Spawner + Clock,
1805 {
1806 runner.start(|context| async move {
1807 let child_handle = Arc::new(Mutex::new(None));
1808 let child_handle2 = child_handle.clone();
1809
1810 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1811 let parent_handle = context.spawn(move |context| async move {
1812 let handle = context.spawn(|_| pending::<()>());
1814
1815 *child_handle2.lock() = Some(handle);
1817
1818 parent_complete_rx.await.unwrap();
1820 });
1821
1822 parent_complete_tx.send(()).unwrap();
1824
1825 assert!(parent_handle.await.is_ok());
1827
1828 let child_handle = child_handle.lock().take().unwrap();
1830 assert!(matches!(child_handle.await, Err(Error::Closed)));
1831 });
1832 }
1833
1834 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1835 where
1836 R::Context: Spawner + Clock,
1837 {
1838 runner.start(|context| async move {
1839 let c0 = context.child("c0");
1849 let g0 = c0.child("g0");
1850 let g1 = c0.child("g1");
1851 let c1 = context.child("c1");
1852 let g2 = c1.child("g2");
1853 let g3 = c1.child("g3");
1854 let c2 = context.child("c2");
1855 let g4 = c2.child("g4");
1856 let g5 = c2.child("g5");
1857
1858 let handles = Arc::new(Mutex::new(Vec::new()));
1860 let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1861 let root_task = context.spawn({
1862 let handles = handles.clone();
1863 move |_| async move {
1864 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1865 {
1866 let handle = context.spawn({
1867 let handles = handles.clone();
1868 let initialized_tx = initialized_tx.clone();
1869 move |_| async move {
1870 for grandchild in grandchildren {
1871 let handle = grandchild.spawn(|_| async {
1872 pending::<()>().await;
1873 });
1874 handles.lock().push(handle);
1875 initialized_tx.send(()).await.unwrap();
1876 }
1877
1878 pending::<()>().await;
1879 }
1880 });
1881 handles.lock().push(handle);
1882 initialized_tx.send(()).await.unwrap();
1883 }
1884
1885 pending::<()>().await;
1886 }
1887 });
1888
1889 for _ in 0..9 {
1891 initialized_rx.recv().await.unwrap();
1892 }
1893
1894 assert_eq!(handles.lock().len(), 9);
1896
1897 root_task.abort();
1899 assert!(matches!(root_task.await, Err(Error::Closed)));
1900
1901 let handles = handles.lock().drain(..).collect::<Vec<_>>();
1903 for handle in handles {
1904 assert!(matches!(handle.await, Err(Error::Closed)));
1905 }
1906 });
1907 }
1908
1909 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1910 where
1911 R::Context: Spawner + Clock,
1912 {
1913 runner.start(|context| async move {
1914 let (child_started_tx, child_started_rx) = oneshot::channel();
1915 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1916 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1917 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1918 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1919 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1920 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1921
1922 let parent = context.spawn(move |context| async move {
1923 let child_handle = context.child("child").spawn(|_| async move {
1925 child_started_tx.send(()).unwrap();
1926 child_complete_rx.await.unwrap();
1928 });
1929 assert!(
1930 child_handle_tx.send(child_handle).is_ok(),
1931 "child handle receiver dropped"
1932 );
1933
1934 let sibling_handle = context.child("sibling").spawn(move |_| async move {
1936 sibling_started_tx.send(()).unwrap();
1937 sibling_complete_rx.await.unwrap();
1939 });
1940 assert!(
1941 sibling_handle_tx.send(sibling_handle).is_ok(),
1942 "sibling handle receiver dropped"
1943 );
1944
1945 parent_complete_rx.await.unwrap();
1947 });
1948
1949 child_started_rx.await.unwrap();
1951 sibling_started_rx.await.unwrap();
1952
1953 sibling_complete_tx.send(()).unwrap();
1955 assert!(sibling_handle_rx.await.is_ok());
1956
1957 child_complete_tx.send(()).unwrap();
1959 assert!(child_handle_rx.await.is_ok());
1960
1961 parent_complete_tx.send(()).unwrap();
1963 assert!(parent.await.is_ok());
1964 });
1965 }
1966
1967 fn test_spawn_clone_chain<R: Runner>(runner: R)
1968 where
1969 R::Context: Spawner + Clock,
1970 {
1971 runner.start(|context| async move {
1972 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1973 let (child_started_tx, child_started_rx) = oneshot::channel();
1974 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1975 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1976 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1977
1978 let parent = context.child("parent").spawn({
1979 move |context| async move {
1980 let child = context.child("child").spawn({
1981 move |context| async move {
1982 let grandchild = context.child("grandchild").spawn({
1983 move |_| async move {
1984 grandchild_started_tx.send(()).unwrap();
1985 pending::<()>().await;
1986 }
1987 });
1988 assert!(
1989 grandchild_handle_tx.send(grandchild).is_ok(),
1990 "grandchild handle receiver dropped"
1991 );
1992 child_started_tx.send(()).unwrap();
1993 pending::<()>().await;
1994 }
1995 });
1996 assert!(
1997 child_handle_tx.send(child).is_ok(),
1998 "child handle receiver dropped"
1999 );
2000 parent_started_tx.send(()).unwrap();
2001 pending::<()>().await;
2002 }
2003 });
2004
2005 parent_started_rx.await.unwrap();
2006 child_started_rx.await.unwrap();
2007 grandchild_started_rx.await.unwrap();
2008
2009 let child_handle = child_handle_rx.await.unwrap();
2010 let grandchild_handle = grandchild_handle_rx.await.unwrap();
2011
2012 parent.abort();
2013 assert!(parent.await.is_err());
2014
2015 assert!(child_handle.await.is_err());
2016 assert!(grandchild_handle.await.is_err());
2017 });
2018 }
2019
2020 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
2021 where
2022 R::Context: Spawner + Clock,
2023 {
2024 runner.start(|context| async move {
2025 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
2026 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
2027
2028 let parent = context.child("parent").spawn({
2029 move |context| async move {
2030 let clone1 = context.child("clone1");
2031 let clone2 = clone1.child("clone2");
2032 let clone3 = clone2.child("clone3");
2033
2034 let leaf = clone3.spawn({
2035 move |_| async move {
2036 leaf_started_tx.send(()).unwrap();
2037 pending::<()>().await;
2038 }
2039 });
2040
2041 leaf_handle_tx
2042 .send(leaf)
2043 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
2044 pending::<()>().await;
2045 }
2046 });
2047
2048 leaf_started_rx.await.unwrap();
2049 let leaf_handle = leaf_handle_rx.await.unwrap();
2050
2051 parent.abort();
2052 assert!(parent.await.is_err());
2053 assert!(leaf_handle.await.is_err());
2054 });
2055 }
2056
2057 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2058 where
2059 R::Context: Spawner,
2060 {
2061 runner.start(|context| async move {
2062 let context = if dedicated {
2063 context.dedicated()
2064 } else {
2065 context.shared(true)
2066 };
2067
2068 let handle = context.spawn(|_| async move { 42 });
2069 let result = handle.await;
2070 assert!(matches!(result, Ok(42)));
2071 });
2072 }
2073
2074 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2075 where
2076 R::Context: Spawner + Clock,
2077 {
2078 runner.start(|context| async move {
2079 if dedicated {
2080 context.child("blocking").dedicated().spawn(|_| async move {
2081 panic!("blocking task panicked");
2082 });
2083 } else {
2084 context
2085 .child("blocking")
2086 .shared(true)
2087 .spawn(|_| async move {
2088 panic!("blocking task panicked");
2089 });
2090 }
2091
2092 loop {
2094 context.sleep(Duration::from_millis(100)).await;
2095 }
2096 });
2097 }
2098
2099 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2100 where
2101 R::Context: Spawner + Clock,
2102 {
2103 let result: Result<(), Error> = runner.start(|context| async move {
2104 let handle = if dedicated {
2105 context.child("blocking").dedicated().spawn(|_| async move {
2106 panic!("blocking task panicked");
2107 })
2108 } else {
2109 context
2110 .child("blocking")
2111 .shared(true)
2112 .spawn(|_| async move {
2113 panic!("blocking task panicked");
2114 })
2115 };
2116 handle.await
2117 });
2118 assert!(matches!(result, Err(Error::Exited)));
2119 }
2120
2121 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2122 runner.start(|_| async move {
2123 let dropper = Arc::new(());
2125 let executor = deterministic::Runner::default();
2126 executor.start({
2127 let dropper = dropper.clone();
2128 move |context| async move {
2129 let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2131 let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2132 let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2133
2134 context.child("task1").spawn({
2136 let setup_tx = setup_tx.clone();
2137 let dropper = dropper.clone();
2138 move |_| async move {
2139 tx2.send(()).unwrap();
2141 rx1.recv().await.unwrap();
2142 setup_tx.send(()).unwrap();
2143
2144 while rx1.recv().await.is_some() {}
2146 drop(tx2);
2147 drop(dropper);
2148 }
2149 });
2150
2151 context.child("task2").spawn(move |_| async move {
2153 tx1.send(()).unwrap();
2155 rx2.recv().await.unwrap();
2156 setup_tx.send(()).unwrap();
2157
2158 while rx2.recv().await.is_some() {}
2160 drop(tx1);
2161 drop(dropper);
2162 });
2163
2164 setup_rx.recv().await.unwrap();
2166 setup_rx.recv().await.unwrap();
2167 }
2168 });
2169
2170 Arc::try_unwrap(dropper).expect("references remaining");
2172 });
2173 }
2174
2175 fn test_late_waker<R: Runner>(runner: R)
2176 where
2177 R::Context: Metrics + Spawner,
2178 {
2179 struct CaptureWaker {
2182 tx: Option<oneshot::Sender<Waker>>,
2183 sent: bool,
2184 }
2185 impl Future for CaptureWaker {
2186 type Output = ();
2187 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2188 if !self.sent {
2189 if let Some(tx) = self.tx.take() {
2190 let _ = tx.send(cx.waker().clone());
2192 }
2193 self.sent = true;
2194 }
2195 Poll::Pending
2196 }
2197 }
2198
2199 struct WakeOnDrop(Option<Waker>);
2201 impl Drop for WakeOnDrop {
2202 fn drop(&mut self) {
2203 if let Some(w) = self.0.take() {
2204 w.wake_by_ref();
2205 }
2206 }
2207 }
2208
2209 let holder = runner.start(|context| async move {
2211 let (tx, rx) = oneshot::channel::<Waker>();
2213
2214 context.child("capture_waker").spawn(move |_| async move {
2216 CaptureWaker {
2217 tx: Some(tx),
2218 sent: false,
2219 }
2220 .await;
2221 });
2222
2223 utils::reschedule().await;
2225
2226 let waker = rx.await.expect("waker not received");
2228
2229 WakeOnDrop(Some(waker))
2231 });
2232
2233 drop(holder);
2236 }
2237
2238 fn test_metrics<R: Runner>(runner: R)
2239 where
2240 R::Context: Metrics,
2241 {
2242 runner.start(|context| async move {
2243 assert_eq!(context.name().label, "");
2245
2246 let counter = Counter::<u64>::default();
2248 let _registered = context.register("test", "test", counter.clone());
2249
2250 counter.inc();
2252
2253 let buffer = context.encode();
2255 assert!(buffer.contains("test_total 1"));
2256
2257 let context = context.child("nested");
2259 let nested_counter = Counter::<u64>::default();
2260 let _nested_registered = context.register("test", "test", nested_counter.clone());
2261
2262 nested_counter.inc();
2264
2265 let buffer = context.encode();
2267 assert!(buffer.contains("nested_test_total 1"));
2268 assert!(buffer.contains("test_total 1"));
2269 });
2270 }
2271
2272 fn test_metrics_with_attribute<R: Runner>(runner: R)
2273 where
2274 R::Context: Metrics,
2275 {
2276 runner.start(|context| async move {
2277 let ctx_epoch5 = context.child("consensus").with_attribute("epoch", "e5");
2279
2280 let counter = Counter::<u64>::default();
2282 let _epoch5 = ctx_epoch5.register("votes", "vote count", counter.clone());
2283 counter.inc();
2284
2285 let buffer = context.encode();
2287 assert!(
2288 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2289 "Expected metric with epoch attribute, got: {}",
2290 buffer
2291 );
2292
2293 let ctx_epoch6 = context.child("consensus").with_attribute("epoch", "e6");
2295 let counter2 = Counter::<u64>::default();
2296 let _epoch6 = ctx_epoch6.register("votes", "vote count", counter2.clone());
2297 counter2.inc();
2298 counter2.inc();
2299
2300 let buffer = context.encode();
2302 assert!(
2303 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2304 "Expected metric with epoch=e5, got: {}",
2305 buffer
2306 );
2307 assert!(
2308 buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2309 "Expected metric with epoch=e6, got: {}",
2310 buffer
2311 );
2312
2313 assert_eq!(
2315 buffer.matches("# HELP consensus_votes").count(),
2316 1,
2317 "HELP should appear exactly once, got: {}",
2318 buffer
2319 );
2320 assert_eq!(
2321 buffer.matches("# TYPE consensus_votes").count(),
2322 1,
2323 "TYPE should appear exactly once, got: {}",
2324 buffer
2325 );
2326
2327 let ctx_multi = context
2329 .child("engine")
2330 .with_attribute("region", "us")
2331 .with_attribute("instance", "i1");
2332 let counter3 = Counter::<u64>::default();
2333 let _multi = ctx_multi.register("requests", "request count", counter3.clone());
2334 counter3.inc();
2335
2336 let buffer = context.encode();
2337 assert!(
2338 buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2339 "Expected metric with sorted attributes, got: {}",
2340 buffer
2341 );
2342 });
2343 }
2344
2345 #[test]
2346 fn test_deterministic_metrics_with_attribute() {
2347 let executor = deterministic::Runner::default();
2348 test_metrics_with_attribute(executor);
2349 }
2350
2351 #[test]
2352 fn test_tokio_metrics_with_attribute() {
2353 let runner = tokio::Runner::default();
2354 test_metrics_with_attribute(runner);
2355 }
2356
2357 fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2358 where
2359 R::Context: Metrics,
2360 {
2361 runner.start(|context| async move {
2362 let ctx = context
2364 .child("orchestrator")
2365 .with_attribute("epoch", "e5")
2366 .child("engine");
2367
2368 let counter = Counter::<u64>::default();
2370 let _registered = ctx.register("votes", "vote count", counter.clone());
2371 counter.inc();
2372
2373 let buffer = context.encode();
2375 assert!(
2376 buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2377 "Expected metric with preserved epoch attribute, got: {}",
2378 buffer
2379 );
2380
2381 let ctx2 = context
2383 .child("outer")
2384 .with_attribute("region", "us")
2385 .child("middle")
2386 .with_attribute("az", "east")
2387 .child("inner");
2388
2389 let counter2 = Counter::<u64>::default();
2390 let _registered2 = ctx2.register("requests", "request count", counter2.clone());
2391 counter2.inc();
2392 counter2.inc();
2393
2394 let buffer = context.encode();
2395 assert!(
2396 buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2397 "Expected metric with all attributes preserved and sorted, got: {}",
2398 buffer
2399 );
2400 });
2401 }
2402
2403 #[test]
2404 fn test_deterministic_metrics_attribute_with_nested_label() {
2405 let executor = deterministic::Runner::default();
2406 test_metrics_attribute_with_nested_label(executor);
2407 }
2408
2409 #[test]
2410 fn test_tokio_metrics_attribute_with_nested_label() {
2411 let runner = tokio::Runner::default();
2412 test_metrics_attribute_with_nested_label(runner);
2413 }
2414
2415 fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2416 where
2417 R::Context: Metrics,
2418 {
2419 runner.start(|context| async move {
2420 let ctx_a = context.child("component_a").with_attribute("epoch", 1);
2422 let ctx_b = context.child("component_b").with_attribute("epoch", 2);
2423
2424 let c1 = Counter::<u64>::default();
2426 let _ctx_a_requests = ctx_a.register("requests", "help", c1);
2427
2428 let c2 = Counter::<u64>::default();
2430 let _ctx_b_requests = ctx_b.register("requests", "help", c2);
2431
2432 let c3 = Counter::<u64>::default();
2434 let _ctx_a_errors = ctx_a.register("errors", "help", c3);
2435
2436 let output = context.encode();
2437
2438 assert!(
2440 output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2441 "ctx_a requests should have epoch=1: {output}"
2442 );
2443 assert!(
2444 output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2445 "ctx_a errors should have epoch=1: {output}"
2446 );
2447 assert!(
2448 !output.contains("component_a_requests_total{epoch=\"2\"}"),
2449 "ctx_a requests should not have epoch=2: {output}"
2450 );
2451
2452 assert!(
2454 output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2455 "ctx_b should have epoch=2: {output}"
2456 );
2457 assert!(
2458 !output.contains("component_b_requests_total{epoch=\"1\"}"),
2459 "ctx_b should not have epoch=1: {output}"
2460 );
2461 });
2462 }
2463
2464 #[test]
2465 fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2466 let executor = deterministic::Runner::default();
2467 test_metrics_attributes_isolated_between_contexts(executor);
2468 }
2469
2470 #[test]
2471 fn test_tokio_metrics_attributes_isolated_between_contexts() {
2472 let runner = tokio::Runner::default();
2473 test_metrics_attributes_isolated_between_contexts(runner);
2474 }
2475
2476 fn test_metrics_spawn_attribute_cardinality<R: Runner>(runner: R)
2483 where
2484 R::Context: Spawner + Metrics + Clock,
2485 {
2486 runner.start(|context| async move {
2487 const ROUNDS: u64 = 128;
2488
2489 let mut handles = Vec::with_capacity(ROUNDS as usize);
2490 for round in 0..ROUNDS {
2491 let handle = context
2492 .child("deferred_verify")
2493 .with_attribute("round", round)
2494 .spawn(move |_| async move { round });
2495 handles.push(handle);
2496 }
2497 for (expected, handle) in handles.into_iter().enumerate() {
2498 assert_eq!(handle.await.expect("task failed"), expected as u64);
2499 }
2500
2501 while count_running_tasks(&context, "deferred_verify") > 0 {
2505 context.sleep(Duration::from_millis(10)).await;
2506 }
2507 let buffer = context.encode();
2508
2509 let spawned_lines = buffer
2513 .lines()
2514 .filter(|line| {
2515 line.starts_with("runtime_tasks_spawned_total{")
2516 && line.contains("name=\"deferred_verify\"")
2517 })
2518 .count();
2519 let running_lines = buffer
2520 .lines()
2521 .filter(|line| {
2522 line.starts_with("runtime_tasks_running{")
2523 && line.contains("name=\"deferred_verify\"")
2524 })
2525 .count();
2526 assert_eq!(
2527 spawned_lines, 1,
2528 "expected exactly 1 runtime_tasks_spawned entry for deferred_verify, got {spawned_lines}: {buffer}",
2529 );
2530 assert_eq!(
2531 running_lines, 1,
2532 "expected exactly 1 runtime_tasks_running entry for deferred_verify, got {running_lines}: {buffer}",
2533 );
2534
2535 let spawned_value = format!(
2537 "runtime_tasks_spawned_total{{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"}} {ROUNDS}"
2538 );
2539 assert!(
2540 buffer.contains(&spawned_value),
2541 "expected accumulated spawned counter `{spawned_value}`, got: {buffer}",
2542 );
2543 let running_value = "runtime_tasks_running{name=\"deferred_verify\",kind=\"Task\",execution=\"Shared\"} 0";
2544 assert!(
2545 buffer.contains(running_value),
2546 "expected running gauge to return to 0, got: {buffer}",
2547 );
2548
2549 assert!(
2552 !buffer
2553 .lines()
2554 .any(|line| line.starts_with("runtime_tasks_")
2555 && line.contains("round=")),
2556 "task metrics must not carry `round` attribute: {buffer}",
2557 );
2558 });
2559 }
2560
2561 #[test]
2562 fn test_deterministic_metrics_spawn_attribute_cardinality() {
2563 let executor = deterministic::Runner::default();
2564 test_metrics_spawn_attribute_cardinality(executor);
2565 }
2566
2567 #[test]
2568 fn test_tokio_metrics_spawn_attribute_cardinality() {
2569 let runner = tokio::Runner::default();
2570 test_metrics_spawn_attribute_cardinality(runner);
2571 }
2572
2573 fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2574 where
2575 R::Context: Metrics,
2576 {
2577 runner.start(|context| async move {
2578 let ctx_ab = context
2580 .child("service")
2581 .with_attribute("region", "us")
2582 .with_attribute("env", "prod");
2583
2584 let ctx_ba = context
2585 .child("service")
2586 .with_attribute("env", "prod")
2587 .with_attribute("region", "us");
2588
2589 let c1 = Counter::<u64>::default();
2591 let _requests = ctx_ab.register("requests", "help", c1.clone());
2592 c1.inc();
2593
2594 let c2 = Counter::<u64>::default();
2596 let _errors = ctx_ba.register("errors", "help", c2.clone());
2597 c2.inc();
2598 c2.inc();
2599
2600 let output = context.encode();
2601
2602 assert!(
2604 output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2605 "requests should have sorted labels: {output}"
2606 );
2607 assert!(
2608 output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2609 "errors should have sorted labels: {output}"
2610 );
2611
2612 assert!(
2614 !output.contains("region=\"us\",env=\"prod\""),
2615 "should not have unsorted label order: {output}"
2616 );
2617 });
2618 }
2619
2620 #[test]
2621 fn test_deterministic_metrics_attributes_sorted_deterministically() {
2622 let executor = deterministic::Runner::default();
2623 test_metrics_attributes_sorted_deterministically(executor);
2624 }
2625
2626 #[test]
2627 fn test_tokio_metrics_attributes_sorted_deterministically() {
2628 let runner = tokio::Runner::default();
2629 test_metrics_attributes_sorted_deterministically(runner);
2630 }
2631
2632 fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2633 where
2634 R::Context: Metrics,
2635 {
2636 runner.start(|context| async move {
2637 let svc_a = context.child("service_a");
2639
2640 let svc_a_v2 = context.child("service_a").with_attribute("version", 2);
2642
2643 let svc_b_worker = context.child("service_b").child("worker");
2645
2646 let svc_b_worker_shard = context
2648 .child("service_b")
2649 .child("worker")
2650 .with_attribute("shard", 99);
2651
2652 let svc_b_manager = context.child("service_b").child("manager");
2654
2655 let svc_c = context.child("service_c");
2657
2658 let c1 = Counter::<u64>::default();
2660 let _svc_a = svc_a.register("requests", "help", c1);
2661
2662 let c2 = Counter::<u64>::default();
2663 let _svc_a_v2 = svc_a_v2.register("requests", "help", c2);
2664
2665 let c3 = Counter::<u64>::default();
2666 let _svc_b_worker = svc_b_worker.register("tasks", "help", c3);
2667
2668 let c4 = Counter::<u64>::default();
2669 let _svc_b_worker_shard = svc_b_worker_shard.register("tasks", "help", c4);
2670
2671 let c5 = Counter::<u64>::default();
2672 let _svc_b_manager = svc_b_manager.register("decisions", "help", c5);
2673
2674 let c6 = Counter::<u64>::default();
2675 let _svc_c = svc_c.register("requests", "help", c6);
2676
2677 let output = context.encode();
2678
2679 assert!(
2681 output.contains("service_a_requests_total 0"),
2682 "svc_a plain should exist: {output}"
2683 );
2684 assert!(
2685 output.contains("service_a_requests_total{version=\"2\"} 0"),
2686 "svc_a_v2 should have version=2: {output}"
2687 );
2688
2689 assert!(
2691 output.contains("service_b_worker_tasks_total 0"),
2692 "svc_b_worker plain should exist: {output}"
2693 );
2694 assert!(
2695 output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2696 "svc_b_worker_shard should have shard=99: {output}"
2697 );
2698
2699 assert!(
2701 output.contains("service_b_manager_decisions_total 0"),
2702 "svc_b_manager should have no attributes: {output}"
2703 );
2704 assert!(
2705 !output.contains("service_b_manager_decisions_total{"),
2706 "svc_b_manager should have no attributes at all: {output}"
2707 );
2708
2709 assert!(
2711 output.contains("service_c_requests_total 0"),
2712 "svc_c should have no attributes: {output}"
2713 );
2714 assert!(
2715 !output.contains("service_c_requests_total{"),
2716 "svc_c should have no attributes at all: {output}"
2717 );
2718
2719 assert!(
2721 !output.contains("service_b_manager_decisions_total{shard="),
2722 "svc_b_manager should not have shard: {output}"
2723 );
2724 assert!(
2725 !output.contains("service_a_requests_total{shard="),
2726 "svc_a should not have shard: {output}"
2727 );
2728 assert!(
2729 !output.contains("service_c_requests_total{version="),
2730 "svc_c should not have version: {output}"
2731 );
2732 });
2733 }
2734
2735 #[test]
2736 fn test_deterministic_metrics_nested_labels_with_attributes() {
2737 let executor = deterministic::Runner::default();
2738 test_metrics_nested_labels_with_attributes(executor);
2739 }
2740
2741 #[test]
2742 fn test_tokio_metrics_nested_labels_with_attributes() {
2743 let runner = tokio::Runner::default();
2744 test_metrics_nested_labels_with_attributes(runner);
2745 }
2746
2747 fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2748 where
2749 R::Context: Metrics,
2750 {
2751 runner.start(|context| async move {
2752 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2753 struct RequestLabels {
2754 method: String,
2755 status: u16,
2756 }
2757
2758 let ctx = context
2760 .child("api")
2761 .with_attribute("region", "us_east")
2762 .with_attribute("env", "prod");
2763
2764 let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2766 let _requests = ctx.register("requests", "HTTP requests", requests.clone());
2767
2768 requests
2770 .get_or_create(&RequestLabels {
2771 method: "GET".to_string(),
2772 status: 200,
2773 })
2774 .inc();
2775 requests
2776 .get_or_create(&RequestLabels {
2777 method: "POST".to_string(),
2778 status: 201,
2779 })
2780 .inc();
2781 requests
2782 .get_or_create(&RequestLabels {
2783 method: "GET".to_string(),
2784 status: 404,
2785 })
2786 .inc();
2787
2788 let output = context.encode();
2789
2790 assert!(
2794 output.contains(
2795 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2796 ),
2797 "GET 200 should have merged labels: {output}"
2798 );
2799 assert!(
2800 output.contains(
2801 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2802 ),
2803 "POST 201 should have merged labels: {output}"
2804 );
2805 assert!(
2806 output.contains(
2807 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2808 ),
2809 "GET 404 should have merged labels: {output}"
2810 );
2811
2812 let ctx_plain = context.child("api_plain");
2814 let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2815 let _plain_requests =
2816 ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2817
2818 plain_requests
2819 .get_or_create(&RequestLabels {
2820 method: "DELETE".to_string(),
2821 status: 204,
2822 })
2823 .inc();
2824
2825 let output = context.encode();
2826
2827 assert!(
2829 output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2830 "plain DELETE should have only family labels: {output}"
2831 );
2832 assert!(
2833 !output.contains("api_plain_requests_total{env="),
2834 "plain should not have env attribute: {output}"
2835 );
2836 assert!(
2837 !output.contains("api_plain_requests_total{region="),
2838 "plain should not have region attribute: {output}"
2839 );
2840 });
2841 }
2842
2843 #[test]
2844 fn test_deterministic_metrics_family_with_attributes() {
2845 let executor = deterministic::Runner::default();
2846 test_metrics_family_with_attributes(executor);
2847 }
2848
2849 #[test]
2850 fn test_tokio_metrics_family_with_attributes() {
2851 let runner = tokio::Runner::default();
2852 test_metrics_family_with_attributes(runner);
2853 }
2854
2855 fn test_register_and_encode<R: Runner>(runner: R)
2856 where
2857 R::Context: Metrics,
2858 {
2859 runner.start(|context| async move {
2860 let counter =
2861 context
2862 .child("engine")
2863 .register("votes", "vote count", Counter::<u64>::default());
2864 counter.inc();
2865
2866 let buffer = context.encode();
2867 assert!(
2868 buffer.contains("engine_votes_total 1"),
2869 "registered metric should appear in encode: {buffer}"
2870 );
2871 });
2872 }
2873
2874 #[test]
2875 fn test_deterministic_register_and_encode() {
2876 let executor = deterministic::Runner::default();
2877 test_register_and_encode(executor);
2878 }
2879
2880 #[test]
2881 fn test_tokio_register_and_encode() {
2882 let runner = tokio::Runner::default();
2883 test_register_and_encode(runner);
2884 }
2885
2886 fn test_register_drop_removes_metrics<R: Runner>(runner: R)
2887 where
2888 R::Context: Metrics,
2889 {
2890 runner.start(|context| async move {
2891 let permanent = context.child("permanent").register(
2892 "counter",
2893 "permanent counter",
2894 Counter::<u64>::default(),
2895 );
2896 permanent.inc();
2897
2898 let counter =
2899 context
2900 .child("engine")
2901 .register("votes", "vote count", Counter::<u64>::default());
2902 counter.inc();
2903
2904 let buffer = context.encode();
2905 assert!(buffer.contains("permanent_counter_total 1"));
2906 assert!(buffer.contains("engine_votes_total 1"));
2907
2908 drop(counter);
2909
2910 let buffer = context.encode();
2911 assert!(
2912 buffer.contains("permanent_counter_total 1"),
2913 "other registered metrics should survive handle drop: {buffer}"
2914 );
2915 assert!(
2916 !buffer.contains("engine_votes"),
2917 "metric should be removed after handle drop: {buffer}"
2918 );
2919 });
2920 }
2921
2922 #[test]
2923 fn test_deterministic_register_drop_removes_metrics() {
2924 let executor = deterministic::Runner::default();
2925 test_register_drop_removes_metrics(executor);
2926 }
2927
2928 #[test]
2929 fn test_tokio_register_drop_removes_metrics() {
2930 let runner = tokio::Runner::default();
2931 test_register_drop_removes_metrics(runner);
2932 }
2933
2934 fn test_register_with_attributes<R: Runner>(runner: R)
2935 where
2936 R::Context: Metrics,
2937 {
2938 runner.start(|context| async move {
2939 let epoch1 = context.child("engine").with_attribute("epoch", 1).register(
2940 "votes",
2941 "vote count",
2942 Counter::<u64>::default(),
2943 );
2944 epoch1.inc();
2945
2946 let epoch2 = context.child("engine").with_attribute("epoch", 2).register(
2947 "votes",
2948 "vote count",
2949 Counter::<u64>::default(),
2950 );
2951 epoch2.inc();
2952 epoch2.inc();
2953
2954 let buffer = context.encode();
2955 assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
2956 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2957
2958 assert_eq!(
2959 buffer.matches("# HELP engine_votes").count(),
2960 1,
2961 "HELP should appear once: {buffer}"
2962 );
2963 assert_eq!(
2964 buffer.matches("# TYPE engine_votes").count(),
2965 1,
2966 "TYPE should appear once: {buffer}"
2967 );
2968
2969 drop(epoch1);
2970 let buffer = context.encode();
2971 assert!(
2972 !buffer.contains("epoch=\"1\""),
2973 "epoch 1 should be gone: {buffer}"
2974 );
2975 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2976
2977 drop(epoch2);
2978 let buffer = context.encode();
2979 assert!(
2980 !buffer.contains("engine_votes"),
2981 "all epoch metrics should be gone: {buffer}"
2982 );
2983 });
2984 }
2985
2986 #[test]
2987 fn test_deterministic_register_with_attributes() {
2988 let executor = deterministic::Runner::default();
2989 test_register_with_attributes(executor);
2990 }
2991
2992 #[test]
2993 fn test_tokio_register_with_attributes() {
2994 let runner = tokio::Runner::default();
2995 test_register_with_attributes(runner);
2996 }
2997
2998 fn test_reregister_after_drop<R: Runner>(runner: R)
2999 where
3000 R::Context: Metrics,
3001 {
3002 runner.start(|context| async move {
3003 let votes = context.child("engine").with_attribute("epoch", 1).register(
3004 "votes",
3005 "vote count",
3006 Counter::<u64>::default(),
3007 );
3008 drop(votes);
3009
3010 let replacement = context.child("engine").with_attribute("epoch", 1).register(
3011 "votes",
3012 "vote count",
3013 Counter::<u64>::default(),
3014 );
3015 drop(replacement);
3016 });
3017 }
3018
3019 #[test]
3020 fn test_deterministic_reregister_after_drop() {
3021 let executor = deterministic::Runner::default();
3022 test_reregister_after_drop(executor);
3023 }
3024
3025 #[test]
3026 fn test_tokio_reregister_after_drop() {
3027 let runner = tokio::Runner::default();
3028 test_reregister_after_drop(runner);
3029 }
3030
3031 fn test_register_clone_keeps_metric_alive<R: Runner>(runner: R)
3032 where
3033 R::Context: Metrics,
3034 {
3035 runner.start(|context| async move {
3036 let registered =
3037 context
3038 .child("engine")
3039 .register("votes", "vote count", Counter::<u64>::default());
3040 registered.inc();
3041 let clone = registered.clone();
3042
3043 let buffer = context.encode();
3044 assert!(
3045 buffer.contains("engine_votes_total 1"),
3046 "metric should remain registered while any handle exists: {buffer}"
3047 );
3048
3049 drop(registered);
3050 let buffer = context.encode();
3051 assert!(
3052 buffer.contains("engine_votes_total 1"),
3053 "metric should survive while clone is retained: {buffer}"
3054 );
3055
3056 drop(clone);
3057 let buffer = context.encode();
3058 assert!(
3059 !buffer.contains("engine_votes"),
3060 "metric should be removed when all handle clones are dropped: {buffer}"
3061 );
3062 });
3063 }
3064
3065 #[test]
3066 fn test_deterministic_register_clone_keeps_metric_alive() {
3067 let executor = deterministic::Runner::default();
3068 test_register_clone_keeps_metric_alive(executor);
3069 }
3070
3071 #[test]
3072 fn test_tokio_register_clone_keeps_metric_alive() {
3073 let runner = tokio::Runner::default();
3074 test_register_clone_keeps_metric_alive(runner);
3075 }
3076
3077 fn test_encode_single_eof<R: Runner>(runner: R)
3078 where
3079 R::Context: Metrics,
3080 {
3081 runner.start(|context| async move {
3082 let root_counter = context.register("root", "root metric", Counter::<u64>::default());
3083 root_counter.inc();
3084
3085 let child =
3086 context
3087 .child("engine")
3088 .register("ops", "child metric", Counter::<u64>::default());
3089 child.inc();
3090
3091 let buffer = context.encode();
3092 assert!(
3093 buffer.contains("root_total 1"),
3094 "root metric missing: {buffer}"
3095 );
3096 assert!(
3097 buffer.contains("engine_ops_total 1"),
3098 "child metric missing: {buffer}"
3099 );
3100 assert_eq!(
3101 buffer.matches("# EOF").count(),
3102 1,
3103 "expected exactly one EOF marker: {buffer}"
3104 );
3105 assert!(
3106 buffer.ends_with("# EOF\n"),
3107 "EOF must be the last line: {buffer}"
3108 );
3109 });
3110 }
3111
3112 #[test]
3113 fn test_deterministic_encode_single_eof() {
3114 let executor = deterministic::Runner::default();
3115 test_encode_single_eof(executor);
3116 }
3117
3118 #[test]
3119 fn test_tokio_encode_single_eof() {
3120 let runner = tokio::Runner::default();
3121 test_encode_single_eof(runner);
3122 }
3123
3124 fn test_family_with_attributes<R: Runner>(runner: R)
3125 where
3126 R::Context: Metrics,
3127 {
3128 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3129 struct Peer {
3130 name: String,
3131 }
3132 impl EncodeLabelSet for Peer {
3133 fn encode(&self, encoder: &mut LabelSetEncoder<'_>) -> Result<(), std::fmt::Error> {
3134 let mut label = encoder.encode_label();
3135 let mut key = label.encode_label_key()?;
3136 EncodeLabelKey::encode(&"peer", &mut key)?;
3137 let mut value = key.encode_label_value()?;
3138 EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3139 value.finish()
3140 }
3141 }
3142
3143 runner.start(|context| async move {
3144 let family = context
3145 .child("batcher")
3146 .with_attribute("epoch", 1)
3147 .register(
3148 "votes",
3149 "votes per peer",
3150 Family::<Peer, Counter>::default(),
3151 );
3152 family
3153 .get_or_create(&Peer {
3154 name: "alice".into(),
3155 })
3156 .inc();
3157 family.get_or_create(&Peer { name: "bob".into() }).inc();
3158
3159 let buffer = context.encode();
3160 assert!(
3161 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3162 "family with attributes should combine labels: {buffer}"
3163 );
3164 assert!(
3165 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3166 "family with attributes should combine labels: {buffer}"
3167 );
3168
3169 drop(family);
3170 let buffer = context.encode();
3171 assert!(
3172 !buffer.contains("batcher_votes"),
3173 "family metrics should be removed: {buffer}"
3174 );
3175 });
3176 }
3177
3178 #[test]
3179 fn test_deterministic_family_with_attributes() {
3180 let executor = deterministic::Runner::default();
3181 test_family_with_attributes(executor);
3182 }
3183
3184 #[test]
3185 fn test_tokio_family_with_attributes() {
3186 let runner = tokio::Runner::default();
3187 test_family_with_attributes(runner);
3188 }
3189
3190 #[test]
3191 fn test_deterministic_future() {
3192 let runner = deterministic::Runner::default();
3193 test_error_future(runner);
3194 }
3195
3196 #[test]
3197 fn test_deterministic_clock_sleep() {
3198 let executor = deterministic::Runner::default();
3199 test_clock_sleep(executor);
3200 }
3201
3202 #[test]
3203 fn test_deterministic_clock_sleep_until() {
3204 let executor = deterministic::Runner::default();
3205 test_clock_sleep_until(executor);
3206 }
3207
3208 #[test]
3209 fn test_deterministic_clock_sleep_until_far_future() {
3210 let executor = deterministic::Runner::default();
3211 test_clock_sleep_until_far_future(executor);
3212 }
3213
3214 #[test]
3215 fn test_deterministic_clock_timeout() {
3216 let executor = deterministic::Runner::default();
3217 test_clock_timeout(executor);
3218 }
3219
3220 #[test]
3221 fn test_deterministic_root_finishes() {
3222 let executor = deterministic::Runner::default();
3223 test_root_finishes(executor);
3224 }
3225
3226 #[test]
3227 fn test_deterministic_spawn_after_abort() {
3228 let executor = deterministic::Runner::default();
3229 test_spawn_after_abort(executor);
3230 }
3231
3232 #[test]
3233 fn test_deterministic_spawn_abort() {
3234 let executor = deterministic::Runner::default();
3235 test_spawn_abort(executor, false, false);
3236 }
3237
3238 #[test]
3239 #[should_panic(expected = "blah")]
3240 fn test_deterministic_panic_aborts_root() {
3241 let runner = deterministic::Runner::default();
3242 test_panic_aborts_root(runner);
3243 }
3244
3245 #[test]
3246 #[should_panic(expected = "blah")]
3247 fn test_deterministic_panic_aborts_root_caught() {
3248 let cfg = deterministic::Config::default().with_catch_panics(true);
3249 let runner = deterministic::Runner::new(cfg);
3250 test_panic_aborts_root(runner);
3251 }
3252
3253 #[test]
3254 #[should_panic(expected = "blah")]
3255 fn test_deterministic_panic_aborts_spawn() {
3256 let executor = deterministic::Runner::default();
3257 test_panic_aborts_spawn(executor);
3258 }
3259
3260 #[test]
3261 fn test_deterministic_panic_aborts_spawn_caught() {
3262 let cfg = deterministic::Config::default().with_catch_panics(true);
3263 let executor = deterministic::Runner::new(cfg);
3264 test_panic_aborts_spawn_caught(executor);
3265 }
3266
3267 #[test]
3268 #[should_panic(expected = "boom")]
3269 fn test_deterministic_multiple_panics() {
3270 let executor = deterministic::Runner::default();
3271 test_multiple_panics(executor);
3272 }
3273
3274 #[test]
3275 fn test_deterministic_multiple_panics_caught() {
3276 let cfg = deterministic::Config::default().with_catch_panics(true);
3277 let executor = deterministic::Runner::new(cfg);
3278 test_multiple_panics_caught(executor);
3279 }
3280
3281 #[test]
3282 fn test_deterministic_select() {
3283 let executor = deterministic::Runner::default();
3284 test_select(executor);
3285 }
3286
3287 #[test]
3288 fn test_deterministic_select_loop() {
3289 let executor = deterministic::Runner::default();
3290 test_select_loop(executor);
3291 }
3292
3293 #[test]
3294 fn test_deterministic_storage_operations() {
3295 let executor = deterministic::Runner::default();
3296 test_storage_operations(executor);
3297 }
3298
3299 #[test]
3300 fn test_deterministic_blob_read_write() {
3301 let executor = deterministic::Runner::default();
3302 test_blob_read_write(executor);
3303 }
3304
3305 #[test]
3306 fn test_deterministic_blob_resize() {
3307 let executor = deterministic::Runner::default();
3308 test_blob_resize(executor);
3309 }
3310
3311 #[test]
3312 fn test_deterministic_many_partition_read_write() {
3313 let executor = deterministic::Runner::default();
3314 test_many_partition_read_write(executor);
3315 }
3316
3317 #[test]
3318 fn test_deterministic_blob_read_past_length() {
3319 let executor = deterministic::Runner::default();
3320 test_blob_read_past_length(executor);
3321 }
3322
3323 #[test]
3324 fn test_deterministic_blob_clone_and_concurrent_read() {
3325 let executor = deterministic::Runner::default();
3327 test_blob_clone_and_concurrent_read(executor);
3328 }
3329
3330 #[test]
3331 fn test_deterministic_shutdown() {
3332 let executor = deterministic::Runner::default();
3333 test_shutdown(executor);
3334 }
3335
3336 #[test]
3337 fn test_deterministic_shutdown_multiple_signals() {
3338 let executor = deterministic::Runner::default();
3339 test_shutdown_multiple_signals(executor);
3340 }
3341
3342 #[test]
3343 fn test_deterministic_shutdown_timeout() {
3344 let executor = deterministic::Runner::default();
3345 test_shutdown_timeout(executor);
3346 }
3347
3348 #[test]
3349 fn test_deterministic_shutdown_multiple_stop_calls() {
3350 let executor = deterministic::Runner::default();
3351 test_shutdown_multiple_stop_calls(executor);
3352 }
3353
3354 #[test]
3355 fn test_deterministic_unfulfilled_shutdown() {
3356 let executor = deterministic::Runner::default();
3357 test_unfulfilled_shutdown(executor);
3358 }
3359
3360 #[test]
3361 fn test_deterministic_spawn_dedicated() {
3362 let executor = deterministic::Runner::default();
3363 test_spawn_dedicated(executor);
3364 }
3365
3366 #[test]
3367 fn test_deterministic_spawn() {
3368 let runner = deterministic::Runner::default();
3369 test_spawn(runner);
3370 }
3371
3372 #[test]
3373 fn test_deterministic_spawn_abort_on_parent_abort() {
3374 let runner = deterministic::Runner::default();
3375 test_spawn_abort_on_parent_abort(runner);
3376 }
3377
3378 #[test]
3379 fn test_deterministic_spawn_abort_on_parent_completion() {
3380 let runner = deterministic::Runner::default();
3381 test_spawn_abort_on_parent_completion(runner);
3382 }
3383
3384 #[test]
3385 fn test_deterministic_spawn_cascading_abort() {
3386 let runner = deterministic::Runner::default();
3387 test_spawn_cascading_abort(runner);
3388 }
3389
3390 #[test]
3391 fn test_deterministic_child_survives_sibling_completion() {
3392 let runner = deterministic::Runner::default();
3393 test_child_survives_sibling_completion(runner);
3394 }
3395
3396 #[test]
3397 fn test_deterministic_spawn_clone_chain() {
3398 let runner = deterministic::Runner::default();
3399 test_spawn_clone_chain(runner);
3400 }
3401
3402 #[test]
3403 fn test_deterministic_spawn_sparse_clone_chain() {
3404 let runner = deterministic::Runner::default();
3405 test_spawn_sparse_clone_chain(runner);
3406 }
3407
3408 #[test]
3409 fn test_deterministic_spawn_blocking() {
3410 for dedicated in [false, true] {
3411 let executor = deterministic::Runner::default();
3412 test_spawn_blocking(executor, dedicated);
3413 }
3414 }
3415
3416 #[test]
3417 #[should_panic(expected = "blocking task panicked")]
3418 fn test_deterministic_spawn_blocking_panic() {
3419 for dedicated in [false, true] {
3420 let executor = deterministic::Runner::default();
3421 test_spawn_blocking_panic(executor, dedicated);
3422 }
3423 }
3424
3425 #[test]
3426 fn test_deterministic_spawn_blocking_panic_caught() {
3427 for dedicated in [false, true] {
3428 let cfg = deterministic::Config::default().with_catch_panics(true);
3429 let executor = deterministic::Runner::new(cfg);
3430 test_spawn_blocking_panic_caught(executor, dedicated);
3431 }
3432 }
3433
3434 #[test]
3435 fn test_deterministic_spawn_blocking_abort() {
3436 for (dedicated, blocking) in [(false, true), (true, false)] {
3437 let executor = deterministic::Runner::default();
3438 test_spawn_abort(executor, dedicated, blocking);
3439 }
3440 }
3441
3442 #[test]
3443 fn test_deterministic_circular_reference_prevents_cleanup() {
3444 let executor = deterministic::Runner::default();
3445 test_circular_reference_prevents_cleanup(executor);
3446 }
3447
3448 #[test]
3449 fn test_deterministic_late_waker() {
3450 let executor = deterministic::Runner::default();
3451 test_late_waker(executor);
3452 }
3453
3454 #[test]
3455 fn test_deterministic_metrics() {
3456 let executor = deterministic::Runner::default();
3457 test_metrics(executor);
3458 }
3459
3460 #[test_collect_traces]
3461 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3462 let executor = deterministic::Runner::new(deterministic::Config::default());
3463 executor.start(|context| async move {
3464 context
3465 .child("test")
3466 .with_span()
3467 .spawn(|context| async move {
3468 tracing::info!(field = "test field", "test log");
3469
3470 context
3471 .child("inner")
3472 .with_span()
3473 .spawn(|_| async move {
3474 tracing::info!("inner log");
3475 })
3476 .await
3477 .unwrap();
3478 })
3479 .await
3480 .unwrap();
3481 });
3482
3483 let info_traces = traces.get_by_level(Level::INFO);
3484 assert_eq!(info_traces.len(), 2);
3485
3486 info_traces
3488 .expect_event_at_index(0, |event| {
3489 event.metadata.expect_content_exact("test log")?;
3490 event.metadata.expect_field_count(1)?;
3491 event.metadata.expect_field_exact("field", "test field")?;
3492 event.expect_span_count(1)?;
3493 event.expect_span_at_index(0, |span| {
3494 span.expect_content_exact("task")?;
3495 span.expect_field_count(1)?;
3496 span.expect_field_exact("name", "test")
3497 })
3498 })
3499 .unwrap();
3500
3501 info_traces
3502 .expect_event_at_index(1, |event| {
3503 event.metadata.expect_content_exact("inner log")?;
3504 event.metadata.expect_field_count(0)?;
3505 event.expect_span_count(1)?;
3506 event.expect_span_at_index(0, |span| {
3507 span.expect_content_exact("task")?;
3508 span.expect_field_count(1)?;
3509 span.expect_field_exact("name", "test_inner")
3510 })
3511 })
3512 .unwrap();
3513 }
3514
3515 #[test]
3516 fn test_deterministic_resolver() {
3517 let executor = deterministic::Runner::default();
3518 executor.start(|context| async move {
3519 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3521 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3522 context.resolver_register("example.com", Some(vec![ip1, ip2]));
3523
3524 let addrs = context.resolve("example.com").await.unwrap();
3526 assert_eq!(addrs, vec![ip1, ip2]);
3527
3528 let result = context.resolve("unknown.com").await;
3530 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3531
3532 context.resolver_register("example.com", None);
3534 let result = context.resolve("example.com").await;
3535 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3536 });
3537 }
3538
3539 #[test]
3540 fn test_tokio_error_future() {
3541 let runner = tokio::Runner::default();
3542 test_error_future(runner);
3543 }
3544
3545 #[test]
3546 fn test_tokio_clock_sleep() {
3547 let executor = tokio::Runner::default();
3548 test_clock_sleep(executor);
3549 }
3550
3551 #[test]
3552 fn test_tokio_clock_sleep_until() {
3553 let executor = tokio::Runner::default();
3554 test_clock_sleep_until(executor);
3555 }
3556
3557 #[test]
3558 fn test_tokio_clock_sleep_until_far_future() {
3559 let executor = tokio::Runner::default();
3560 test_clock_sleep_until_far_future(executor);
3561 }
3562
3563 #[test]
3564 fn test_tokio_clock_timeout() {
3565 let executor = tokio::Runner::default();
3566 test_clock_timeout(executor);
3567 }
3568
3569 #[test]
3570 fn test_tokio_root_finishes() {
3571 let executor = tokio::Runner::default();
3572 test_root_finishes(executor);
3573 }
3574
3575 #[test]
3576 fn test_tokio_spawn_after_abort() {
3577 let executor = tokio::Runner::default();
3578 test_spawn_after_abort(executor);
3579 }
3580
3581 #[test]
3582 fn test_tokio_spawn_abort() {
3583 let executor = tokio::Runner::default();
3584 test_spawn_abort(executor, false, false);
3585 }
3586
3587 #[test]
3588 #[should_panic(expected = "blah")]
3589 fn test_tokio_panic_aborts_root() {
3590 let executor = tokio::Runner::default();
3591 test_panic_aborts_root(executor);
3592 }
3593
3594 #[test]
3595 #[should_panic(expected = "blah")]
3596 fn test_tokio_panic_aborts_root_caught() {
3597 let cfg = tokio::Config::default().with_catch_panics(true);
3598 let executor = tokio::Runner::new(cfg);
3599 test_panic_aborts_root(executor);
3600 }
3601
3602 #[test]
3603 #[should_panic(expected = "blah")]
3604 fn test_tokio_panic_aborts_spawn() {
3605 let executor = tokio::Runner::default();
3606 test_panic_aborts_spawn(executor);
3607 }
3608
3609 #[test]
3610 fn test_tokio_panic_aborts_spawn_caught() {
3611 let cfg = tokio::Config::default().with_catch_panics(true);
3612 let executor = tokio::Runner::new(cfg);
3613 test_panic_aborts_spawn_caught(executor);
3614 }
3615
3616 #[test]
3617 #[should_panic(expected = "boom")]
3618 fn test_tokio_multiple_panics() {
3619 let executor = tokio::Runner::default();
3620 test_multiple_panics(executor);
3621 }
3622
3623 #[test]
3624 fn test_tokio_multiple_panics_caught() {
3625 let cfg = tokio::Config::default().with_catch_panics(true);
3626 let executor = tokio::Runner::new(cfg);
3627 test_multiple_panics_caught(executor);
3628 }
3629
3630 #[test]
3631 fn test_tokio_select() {
3632 let executor = tokio::Runner::default();
3633 test_select(executor);
3634 }
3635
3636 #[test]
3637 fn test_tokio_select_loop() {
3638 let executor = tokio::Runner::default();
3639 test_select_loop(executor);
3640 }
3641
3642 #[test]
3643 fn test_tokio_storage_operations() {
3644 let executor = tokio::Runner::default();
3645 test_storage_operations(executor);
3646 }
3647
3648 #[test]
3649 fn test_tokio_blob_read_write() {
3650 let executor = tokio::Runner::default();
3651 test_blob_read_write(executor);
3652 }
3653
3654 #[test]
3655 fn test_tokio_blob_resize() {
3656 let executor = tokio::Runner::default();
3657 test_blob_resize(executor);
3658 }
3659
3660 #[test]
3661 fn test_tokio_many_partition_read_write() {
3662 let executor = tokio::Runner::default();
3663 test_many_partition_read_write(executor);
3664 }
3665
3666 #[test]
3667 fn test_tokio_blob_read_past_length() {
3668 let executor = tokio::Runner::default();
3669 test_blob_read_past_length(executor);
3670 }
3671
3672 #[test]
3673 fn test_tokio_blob_clone_and_concurrent_read() {
3674 let executor = tokio::Runner::default();
3676 test_blob_clone_and_concurrent_read(executor);
3677 }
3678
3679 #[test]
3680 fn test_tokio_shutdown() {
3681 let executor = tokio::Runner::default();
3682 test_shutdown(executor);
3683 }
3684
3685 #[test]
3686 fn test_tokio_shutdown_multiple_signals() {
3687 let executor = tokio::Runner::default();
3688 test_shutdown_multiple_signals(executor);
3689 }
3690
3691 #[test]
3692 fn test_tokio_shutdown_timeout() {
3693 let executor = tokio::Runner::default();
3694 test_shutdown_timeout(executor);
3695 }
3696
3697 #[test]
3698 fn test_tokio_shutdown_multiple_stop_calls() {
3699 let executor = tokio::Runner::default();
3700 test_shutdown_multiple_stop_calls(executor);
3701 }
3702
3703 #[test]
3704 fn test_tokio_unfulfilled_shutdown() {
3705 let executor = tokio::Runner::default();
3706 test_unfulfilled_shutdown(executor);
3707 }
3708
3709 #[test]
3710 fn test_tokio_spawn_dedicated() {
3711 let executor = tokio::Runner::default();
3712 test_spawn_dedicated(executor);
3713 }
3714
3715 #[test]
3716 fn test_tokio_spawn() {
3717 let runner = tokio::Runner::default();
3718 test_spawn(runner);
3719 }
3720
3721 #[test]
3722 fn test_tokio_spawn_abort_on_parent_abort() {
3723 let runner = tokio::Runner::default();
3724 test_spawn_abort_on_parent_abort(runner);
3725 }
3726
3727 #[test]
3728 fn test_tokio_spawn_abort_on_parent_completion() {
3729 let runner = tokio::Runner::default();
3730 test_spawn_abort_on_parent_completion(runner);
3731 }
3732
3733 #[test]
3734 fn test_tokio_spawn_cascading_abort() {
3735 let runner = tokio::Runner::default();
3736 test_spawn_cascading_abort(runner);
3737 }
3738
3739 #[test]
3740 fn test_tokio_child_survives_sibling_completion() {
3741 let runner = tokio::Runner::default();
3742 test_child_survives_sibling_completion(runner);
3743 }
3744
3745 #[test]
3746 fn test_tokio_spawn_clone_chain() {
3747 let runner = tokio::Runner::default();
3748 test_spawn_clone_chain(runner);
3749 }
3750
3751 #[test]
3752 fn test_tokio_spawn_sparse_clone_chain() {
3753 let runner = tokio::Runner::default();
3754 test_spawn_sparse_clone_chain(runner);
3755 }
3756
3757 #[test]
3758 fn test_tokio_spawn_blocking() {
3759 for dedicated in [false, true] {
3760 let executor = tokio::Runner::default();
3761 test_spawn_blocking(executor, dedicated);
3762 }
3763 }
3764
3765 #[test]
3766 #[should_panic(expected = "blocking task panicked")]
3767 fn test_tokio_spawn_blocking_panic() {
3768 for dedicated in [false, true] {
3769 let executor = tokio::Runner::default();
3770 test_spawn_blocking_panic(executor, dedicated);
3771 }
3772 }
3773
3774 #[test]
3775 fn test_tokio_spawn_blocking_panic_caught() {
3776 for dedicated in [false, true] {
3777 let cfg = tokio::Config::default().with_catch_panics(true);
3778 let executor = tokio::Runner::new(cfg);
3779 test_spawn_blocking_panic_caught(executor, dedicated);
3780 }
3781 }
3782
3783 #[test]
3784 fn test_tokio_spawn_blocking_abort() {
3785 for (dedicated, blocking) in [(false, true), (true, false)] {
3786 let executor = tokio::Runner::default();
3787 test_spawn_abort(executor, dedicated, blocking);
3788 }
3789 }
3790
3791 #[test]
3792 fn test_tokio_circular_reference_prevents_cleanup() {
3793 let executor = tokio::Runner::default();
3794 test_circular_reference_prevents_cleanup(executor);
3795 }
3796
3797 #[test]
3798 fn test_tokio_late_waker() {
3799 let executor = tokio::Runner::default();
3800 test_late_waker(executor);
3801 }
3802
3803 #[test]
3804 fn test_tokio_metrics() {
3805 let executor = tokio::Runner::default();
3806 test_metrics(executor);
3807 }
3808
3809 #[test]
3810 fn test_tokio_process_rss_metric() {
3811 let executor = tokio::Runner::default();
3812 executor.start(|context| async move {
3813 loop {
3814 let metrics = context.encode();
3816 if !metrics.contains("runtime_process_rss") {
3817 context.sleep(Duration::from_millis(100)).await;
3818 continue;
3819 }
3820
3821 for line in metrics.lines() {
3823 if line.starts_with("runtime_process_rss")
3824 && !line.starts_with("runtime_process_rss{")
3825 {
3826 let parts: Vec<&str> = line.split_whitespace().collect();
3827 if parts.len() >= 2 {
3828 let rss_value: i64 =
3829 parts[1].parse().expect("Failed to parse RSS value");
3830 if rss_value > 0 {
3831 return;
3832 }
3833 }
3834 }
3835 }
3836 }
3837 });
3838 }
3839
3840 #[test]
3841 fn test_tokio_telemetry() {
3842 let executor = tokio::Runner::default();
3843 executor.start(|context| async move {
3844 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3846
3847 tokio::telemetry::init(
3849 context.child("metrics"),
3850 tokio::telemetry::Logging {
3851 level: Level::INFO,
3852 json: false,
3853 },
3854 Some(address),
3855 None,
3856 );
3857
3858 let counter: Counter<u64> = Counter::default();
3860 let _registered = context.register("test_counter", "Test counter", counter.clone());
3861 counter.inc();
3862
3863 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3865 let mut line = Vec::new();
3866 loop {
3867 let received = stream.recv(1).await?;
3868 let byte = received.coalesce().as_ref()[0];
3869 if byte == b'\n' {
3870 if line.last() == Some(&b'\r') {
3871 line.pop(); }
3873 break;
3874 }
3875 line.push(byte);
3876 }
3877 String::from_utf8(line).map_err(|_| Error::ReadFailed)
3878 }
3879
3880 async fn read_headers<St: Stream>(
3881 stream: &mut St,
3882 ) -> Result<HashMap<String, String>, Error> {
3883 let mut headers = HashMap::new();
3884 loop {
3885 let line = read_line(stream).await?;
3886 if line.is_empty() {
3887 break;
3888 }
3889 let parts: Vec<&str> = line.splitn(2, ": ").collect();
3890 if parts.len() == 2 {
3891 headers.insert(parts[0].to_string(), parts[1].to_string());
3892 }
3893 }
3894 Ok(headers)
3895 }
3896
3897 async fn read_body<St: Stream>(
3898 stream: &mut St,
3899 content_length: usize,
3900 ) -> Result<String, Error> {
3901 let received = stream.recv(content_length).await?;
3902 String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3903 }
3904
3905 let client_handle = context.child("client").spawn(move |context| async move {
3907 let (mut sink, mut stream) = loop {
3908 match context.dial(address).await {
3909 Ok((sink, stream)) => break (sink, stream),
3910 Err(e) => {
3911 error!(err =?e, "failed to connect");
3913 context.sleep(Duration::from_millis(10)).await;
3914 }
3915 }
3916 };
3917
3918 let request = format!(
3920 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3921 );
3922 sink.send(Bytes::from(request)).await.unwrap();
3923
3924 let status_line = read_line(&mut stream).await.unwrap();
3926 assert_eq!(status_line, "HTTP/1.1 200 OK");
3927
3928 let headers = read_headers(&mut stream).await.unwrap();
3930 println!("Headers: {headers:?}");
3931 let content_length = headers
3932 .get("content-length")
3933 .unwrap()
3934 .parse::<usize>()
3935 .unwrap();
3936
3937 let body = read_body(&mut stream, content_length).await.unwrap();
3939 assert!(body.contains("test_counter_total 1"));
3940 });
3941
3942 client_handle.await.unwrap();
3944 });
3945 }
3946
3947 #[test]
3948 fn test_tokio_resolver() {
3949 let executor = tokio::Runner::default();
3950 executor.start(|context| async move {
3951 let addrs = context.resolve("localhost").await.unwrap();
3952 assert!(!addrs.is_empty());
3953 for addr in addrs {
3954 assert!(
3955 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3956 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3957 );
3958 }
3959 });
3960 }
3961
3962 #[test]
3963 fn test_create_thread_pool_tokio() {
3964 let executor = tokio::Runner::default();
3965 executor.start(|context| async move {
3966 let pool = context
3968 .child("pool")
3969 .create_thread_pool(NZUsize!(4))
3970 .unwrap();
3971
3972 let v: Vec<_> = (0..10000).collect();
3974
3975 pool.install(|| {
3977 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3978 });
3979 });
3980 }
3981
3982 #[test]
3983 fn test_create_thread_pool_deterministic() {
3984 let executor = deterministic::Runner::default();
3985 executor.start(|context| async move {
3986 let pool = context
3988 .child("pool")
3989 .create_thread_pool(NZUsize!(4))
3990 .unwrap();
3991
3992 let v: Vec<_> = (0..10000).collect();
3994
3995 pool.install(|| {
3997 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3998 });
3999 });
4000 }
4001
4002 fn test_buffer_pooler<R: Runner>(
4003 runner: R,
4004 expected_network_max_per_class: u32,
4005 expected_storage_max_per_class: u32,
4006 ) where
4007 R::Context: BufferPooler,
4008 {
4009 runner.start(|context| async move {
4010 let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
4012 assert!(net_buf.capacity() >= 1024);
4013
4014 let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
4016 assert!(storage_buf.capacity() >= 4096);
4017
4018 assert_eq!(
4020 context.network_buffer_pool().config().max_per_class.get(),
4021 expected_network_max_per_class
4022 );
4023 assert_eq!(
4024 context.storage_buffer_pool().config().max_per_class.get(),
4025 expected_storage_max_per_class
4026 );
4027 });
4028 }
4029
4030 #[test]
4031 fn test_deterministic_buffer_pooler() {
4032 test_buffer_pooler(deterministic::Runner::default(), 4096, 64);
4033
4034 let runner = deterministic::Runner::new(
4035 deterministic::Config::default()
4036 .with_network_buffer_pool_config(
4037 BufferPoolConfig::for_network().with_max_per_class(NZU32!(64)),
4038 )
4039 .with_storage_buffer_pool_config(
4040 BufferPoolConfig::for_storage().with_max_per_class(NZU32!(8)),
4041 ),
4042 );
4043 test_buffer_pooler(runner, 64, 8);
4044 }
4045
4046 #[test]
4047 fn test_tokio_buffer_pooler() {
4048 test_buffer_pooler(tokio::Runner::default(), 4096, 64);
4049
4050 let runner = tokio::Runner::new(
4051 tokio::Config::default()
4052 .with_network_buffer_pool_config(
4053 BufferPoolConfig::for_network().with_max_per_class(NZU32!(64)),
4054 )
4055 .with_storage_buffer_pool_config(
4056 BufferPoolConfig::for_storage().with_max_per_class(NZU32!(8)),
4057 ),
4058 );
4059 test_buffer_pooler(runner, 64, 8);
4060 }
4061}