1#![doc(
20 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21 html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34 pub mod deterministic;
35 pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38 pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41 mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44 pub mod tokio;
45});
46stability_scope!(BETA {
47 use commonware_macros::select;
48 use commonware_parallel::{Rayon, ThreadPool};
49 use iobuf::PoolError;
50 use prometheus_client::registry::Metric;
51 use rayon::ThreadPoolBuildError;
52 use std::{
53 future::Future,
54 io::Error as IoError,
55 net::SocketAddr,
56 num::NonZeroUsize,
57 time::{Duration, SystemTime},
58 };
59 use thiserror::Error;
60
61 pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64 pub use bytes::{Buf, BufMut};
66 pub use governor::Quota;
68
69 pub mod iobuf;
70 pub use iobuf::{BufferPool, BufferPoolConfig, IoBuf, IoBufMut, IoBufs, IoBufsMut};
71
72 pub mod utils;
73 pub use utils::*;
74
75 pub mod telemetry;
76
77 pub const DEFAULT_BLOB_VERSION: u16 = 0;
79
80 #[derive(Error, Debug)]
82 pub enum Error {
83 #[error("exited")]
84 Exited,
85 #[error("closed")]
86 Closed,
87 #[error("timeout")]
88 Timeout,
89 #[error("bind failed")]
90 BindFailed,
91 #[error("connection failed")]
92 ConnectionFailed,
93 #[error("write failed")]
94 WriteFailed,
95 #[error("read failed")]
96 ReadFailed,
97 #[error("send failed")]
98 SendFailed,
99 #[error("recv failed")]
100 RecvFailed,
101 #[error("dns resolution failed: {0}")]
102 ResolveFailed(String),
103 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
104 PartitionNameInvalid(String),
105 #[error("partition creation failed: {0}")]
106 PartitionCreationFailed(String),
107 #[error("partition missing: {0}")]
108 PartitionMissing(String),
109 #[error("partition corrupt: {0}")]
110 PartitionCorrupt(String),
111 #[error("blob open failed: {0}/{1} error: {2}")]
112 BlobOpenFailed(String, String, IoError),
113 #[error("blob missing: {0}/{1}")]
114 BlobMissing(String, String),
115 #[error("blob resize failed: {0}/{1} error: {2}")]
116 BlobResizeFailed(String, String, IoError),
117 #[error("blob sync failed: {0}/{1} error: {2}")]
118 BlobSyncFailed(String, String, IoError),
119 #[error("blob insufficient length")]
120 BlobInsufficientLength,
121 #[error("blob corrupt: {0}/{1} reason: {2}")]
122 BlobCorrupt(String, String, String),
123 #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
124 BlobVersionMismatch {
125 expected: std::ops::RangeInclusive<u16>,
126 found: u16,
127 },
128 #[error("invalid or missing checksum")]
129 InvalidChecksum,
130 #[error("offset overflow")]
131 OffsetOverflow,
132 #[error("io error: {0}")]
133 Io(#[from] IoError),
134 #[error("buffer pool: {0}")]
135 Pool(#[from] PoolError),
136 }
137
138 pub trait Runner {
141 type Context;
147
148 fn start<F, Fut>(self, f: F) -> Fut::Output
154 where
155 F: FnOnce(Self::Context) -> Fut,
156 Fut: Future;
157 }
158
159 pub trait Spawner: Clone + Send + Sync + 'static {
161 fn shared(self, blocking: bool) -> Self;
170
171 fn dedicated(self) -> Self;
178
179 fn instrumented(self) -> Self;
181
182 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
216 where
217 F: FnOnce(Self) -> Fut + Send + 'static,
218 Fut: Future<Output = T> + Send + 'static,
219 T: Send + 'static;
220
221 fn stop(
241 self,
242 value: i32,
243 timeout: Option<Duration>,
244 ) -> impl Future<Output = Result<(), Error>> + Send;
245
246 fn stopped(&self) -> signal::Signal;
253 }
254
255 pub trait ThreadPooler: Spawner + Metrics {
258 fn create_thread_pool(
267 &self,
268 concurrency: NonZeroUsize,
269 ) -> Result<ThreadPool, ThreadPoolBuildError>;
270
271 fn create_strategy(
280 &self,
281 concurrency: NonZeroUsize,
282 ) -> Result<Rayon, ThreadPoolBuildError> {
283 self.create_thread_pool(concurrency).map(Rayon::with_pool)
284 }
285 }
286
287 pub trait Metrics: Clone + Send + Sync + 'static {
289 fn label(&self) -> String;
291
292 fn with_label(&self, label: &str) -> Self;
300
301 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
388
389 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
395
396 fn encode(&self) -> String;
403
404 fn with_scope(&self) -> Self;
435 }
436
437 pub type RateLimiter<C> = governor::RateLimiter<
442 governor::state::NotKeyed,
443 governor::state::InMemoryState,
444 C,
445 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
446 >;
447
448 pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
455 K,
456 governor::state::keyed::HashMapStateStore<K>,
457 C,
458 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
459 >;
460
461 pub trait Clock:
467 governor::clock::Clock<Instant = SystemTime>
468 + governor::clock::ReasonablyRealtime
469 + Clone
470 + Send
471 + Sync
472 + 'static
473 {
474 fn current(&self) -> SystemTime;
476
477 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
479
480 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
482
483 fn timeout<F, T>(
504 &self,
505 duration: Duration,
506 future: F,
507 ) -> impl Future<Output = Result<T, Error>> + Send + '_
508 where
509 F: Future<Output = T> + Send + 'static,
510 T: Send + 'static,
511 {
512 async move {
513 select! {
514 result = future => Ok(result),
515 _ = self.sleep(duration) => Err(Error::Timeout),
516 }
517 }
518 }
519 }
520
521 pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
523
524 pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
526
527 pub type ListenerOf<N> = <N as crate::Network>::Listener;
529
530 pub trait Network: Clone + Send + Sync + 'static {
533 type Listener: Listener;
537
538 fn bind(
540 &self,
541 socket: SocketAddr,
542 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
543
544 fn dial(
546 &self,
547 socket: SocketAddr,
548 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
549 }
550
551 pub trait Resolver: Clone + Send + Sync + 'static {
553 fn resolve(
557 &self,
558 host: &str,
559 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
560 }
561
562 pub trait Listener: Sync + Send + 'static {
565 type Sink: Sink;
568 type Stream: Stream;
571
572 fn accept(
574 &mut self,
575 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
576
577 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
579 }
580
581 pub trait Sink: Sync + Send + 'static {
584 fn send(
590 &mut self,
591 bufs: impl Into<IoBufs> + Send,
592 ) -> impl Future<Output = Result<(), Error>> + Send;
593 }
594
595 pub trait Stream: Sync + Send + 'static {
598 fn recv(&mut self, len: usize) -> impl Future<Output = Result<IoBufs, Error>> + Send;
606
607 fn peek(&self, max_len: usize) -> &[u8];
615 }
616
617 pub trait Storage: Clone + Send + Sync + 'static {
630 type Blob: Blob;
632
633 fn open(
636 &self,
637 partition: &str,
638 name: &[u8],
639 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
640 async move {
641 let (blob, size, _) = self
642 .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
643 .await?;
644 Ok((blob, size))
645 }
646 }
647
648 fn open_versioned(
665 &self,
666 partition: &str,
667 name: &[u8],
668 versions: std::ops::RangeInclusive<u16>,
669 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
670
671 fn remove(
677 &self,
678 partition: &str,
679 name: Option<&[u8]>,
680 ) -> impl Future<Output = Result<(), Error>> + Send;
681
682 fn scan(&self, partition: &str)
684 -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
685 }
686
687 #[allow(clippy::len_without_is_empty)]
702 pub trait Blob: Clone + Send + Sync + 'static {
703 fn read_at_buf(
719 &self,
720 offset: u64,
721 len: usize,
722 bufs: impl Into<IoBufsMut> + Send,
723 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
724
725 fn read_at(
730 &self,
731 offset: u64,
732 len: usize,
733 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
734
735 fn write_at(
737 &self,
738 offset: u64,
739 bufs: impl Into<IoBufs> + Send,
740 ) -> impl Future<Output = Result<(), Error>> + Send;
741
742 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
747
748 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
750 }
751
752 pub trait BufferPooler: Clone + Send + Sync + 'static {
754 fn network_buffer_pool(&self) -> &BufferPool;
756
757 fn storage_buffer_pool(&self) -> &BufferPool;
759 }
760});
761stability_scope!(BETA, cfg(feature = "external") {
762 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
764 fn pace<'a, F, T>(
784 &'a self,
785 latency: Duration,
786 future: F,
787 ) -> impl Future<Output = T> + Send + 'a
788 where
789 F: Future<Output = T> + Send + 'a,
790 T: Send + 'a;
791 }
792
793 pub trait FutureExt: Future + Send + Sized {
798 fn pace<'a, E>(
800 self,
801 pacer: &'a E,
802 latency: Duration,
803 ) -> impl Future<Output = Self::Output> + Send + 'a
804 where
805 E: Pacer + 'a,
806 Self: Send + 'a,
807 Self::Output: Send + 'a,
808 {
809 pacer.pace(latency, self)
810 }
811 }
812
813 impl<F> FutureExt for F where F: Future + Send {}
814});
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use crate::telemetry::traces::collector::TraceStorage;
820 use bytes::Bytes;
821 use commonware_macros::{select, test_collect_traces};
822 use commonware_utils::{
823 channel::{mpsc, oneshot},
824 sync::Mutex,
825 NZUsize,
826 };
827 use futures::{
828 future::{pending, ready},
829 join, pin_mut, FutureExt,
830 };
831 use prometheus_client::{
832 encoding::{EncodeLabelKey, EncodeLabelSet, EncodeLabelValue},
833 metrics::{counter::Counter, family::Family},
834 };
835 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
836 use std::{
837 collections::HashMap,
838 net::{IpAddr, Ipv4Addr, Ipv6Addr},
839 pin::Pin,
840 str::FromStr,
841 sync::{
842 atomic::{AtomicU32, Ordering},
843 Arc,
844 },
845 task::{Context as TContext, Poll, Waker},
846 };
847 use tracing::{error, Level};
848 use utils::reschedule;
849
850 fn test_error_future<R: Runner>(runner: R) {
851 #[allow(clippy::unused_async)]
852 async fn error_future() -> Result<&'static str, &'static str> {
853 Err("An error occurred")
854 }
855 let result = runner.start(|_| error_future());
856 assert_eq!(result, Err("An error occurred"));
857 }
858
859 fn test_clock_sleep<R: Runner>(runner: R)
860 where
861 R::Context: Spawner + Clock,
862 {
863 runner.start(|context| async move {
864 let start = context.current();
866 let sleep_duration = Duration::from_millis(10);
867 context.sleep(sleep_duration).await;
868
869 let end = context.current();
871 assert!(end.duration_since(start).unwrap() >= sleep_duration);
872 });
873 }
874
875 fn test_clock_sleep_until<R: Runner>(runner: R)
876 where
877 R::Context: Spawner + Clock + Metrics,
878 {
879 runner.start(|context| async move {
880 let now = context.current();
882 context.sleep_until(now + Duration::from_millis(100)).await;
883
884 let elapsed = now.elapsed().unwrap();
886 assert!(elapsed >= Duration::from_millis(100));
887 });
888 }
889
890 fn test_clock_timeout<R: Runner>(runner: R)
891 where
892 R::Context: Spawner + Clock,
893 {
894 runner.start(|context| async move {
895 let result = context
897 .timeout(Duration::from_millis(100), async { "success" })
898 .await;
899 assert_eq!(result.unwrap(), "success");
900
901 let result = context
903 .timeout(Duration::from_millis(50), pending::<()>())
904 .await;
905 assert!(matches!(result, Err(Error::Timeout)));
906
907 let result = context
909 .timeout(
910 Duration::from_millis(100),
911 context.sleep(Duration::from_millis(50)),
912 )
913 .await;
914 assert!(result.is_ok());
915 });
916 }
917
918 fn test_root_finishes<R: Runner>(runner: R)
919 where
920 R::Context: Spawner,
921 {
922 runner.start(|context| async move {
923 context.spawn(|_| async move {
924 loop {
925 reschedule().await;
926 }
927 });
928 });
929 }
930
931 fn test_spawn_after_abort<R>(runner: R)
932 where
933 R: Runner,
934 R::Context: Spawner + Clone,
935 {
936 runner.start(|context| async move {
937 let child = context.clone();
939
940 let parent_handle = context.spawn(move |_| async move {
942 pending::<()>().await;
943 });
944 parent_handle.abort();
945
946 let child_handle = child.spawn(move |_| async move {
948 pending::<()>().await;
949 });
950 assert!(matches!(child_handle.await, Err(Error::Closed)));
951 });
952 }
953
954 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
955 where
956 R::Context: Spawner,
957 {
958 runner.start(|context| async move {
959 let context = if dedicated {
960 assert!(!blocking);
961 context.dedicated()
962 } else {
963 context.shared(blocking)
964 };
965
966 let handle = context.spawn(|_| async move {
967 loop {
968 reschedule().await;
969 }
970 });
971 handle.abort();
972 assert!(matches!(handle.await, Err(Error::Closed)));
973 });
974 }
975
976 fn test_panic_aborts_root<R: Runner>(runner: R) {
977 let result: Result<(), Error> = runner.start(|_| async move {
978 panic!("blah");
979 });
980 result.unwrap_err();
981 }
982
983 fn test_panic_aborts_spawn<R: Runner>(runner: R)
984 where
985 R::Context: Spawner + Clock,
986 {
987 runner.start(|context| async move {
988 context.clone().spawn(|_| async move {
989 panic!("blah");
990 });
991
992 loop {
994 context.sleep(Duration::from_millis(100)).await;
995 }
996 });
997 }
998
999 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
1000 where
1001 R::Context: Spawner + Clock,
1002 {
1003 let result: Result<(), Error> = runner.start(|context| async move {
1004 let result = context.clone().spawn(|_| async move {
1005 panic!("blah");
1006 });
1007 result.await
1008 });
1009 assert!(matches!(result, Err(Error::Exited)));
1010 }
1011
1012 fn test_multiple_panics<R: Runner>(runner: R)
1013 where
1014 R::Context: Spawner + Clock,
1015 {
1016 runner.start(|context| async move {
1017 context.clone().spawn(|_| async move {
1018 panic!("boom 1");
1019 });
1020 context.clone().spawn(|_| async move {
1021 panic!("boom 2");
1022 });
1023 context.clone().spawn(|_| async move {
1024 panic!("boom 3");
1025 });
1026
1027 loop {
1029 context.sleep(Duration::from_millis(100)).await;
1030 }
1031 });
1032 }
1033
1034 fn test_multiple_panics_caught<R: Runner>(runner: R)
1035 where
1036 R::Context: Spawner + Clock,
1037 {
1038 let (res1, res2, res3) = runner.start(|context| async move {
1039 let handle1 = context.clone().spawn(|_| async move {
1040 panic!("boom 1");
1041 });
1042 let handle2 = context.clone().spawn(|_| async move {
1043 panic!("boom 2");
1044 });
1045 let handle3 = context.clone().spawn(|_| async move {
1046 panic!("boom 3");
1047 });
1048
1049 join!(handle1, handle2, handle3)
1050 });
1051 assert!(matches!(res1, Err(Error::Exited)));
1052 assert!(matches!(res2, Err(Error::Exited)));
1053 assert!(matches!(res3, Err(Error::Exited)));
1054 }
1055
1056 fn test_select<R: Runner>(runner: R) {
1057 runner.start(|_| async move {
1058 let output = Mutex::new(0);
1060 select! {
1061 v1 = ready(1) => {
1062 *output.lock() = v1;
1063 },
1064 v2 = ready(2) => {
1065 *output.lock() = v2;
1066 },
1067 };
1068 assert_eq!(*output.lock(), 1);
1069
1070 select! {
1072 v1 = std::future::pending::<i32>() => {
1073 *output.lock() = v1;
1074 },
1075 v2 = ready(2) => {
1076 *output.lock() = v2;
1077 },
1078 };
1079 assert_eq!(*output.lock(), 2);
1080 });
1081 }
1082
1083 fn test_select_loop<R: Runner>(runner: R)
1085 where
1086 R::Context: Clock,
1087 {
1088 runner.start(|context| async move {
1089 let (sender, mut receiver) = mpsc::unbounded_channel();
1091 for _ in 0..2 {
1092 select! {
1093 v = receiver.recv() => {
1094 panic!("unexpected value: {v:?}");
1095 },
1096 _ = context.sleep(Duration::from_millis(100)) => {
1097 continue;
1098 },
1099 };
1100 }
1101
1102 sender.send(0).unwrap();
1104 sender.send(1).unwrap();
1105
1106 select! {
1108 _ = async {} => {
1109 },
1111 v = receiver.recv() => {
1112 panic!("unexpected value: {v:?}");
1113 },
1114 };
1115
1116 for i in 0..2 {
1118 select! {
1119 _ = context.sleep(Duration::from_millis(100)) => {
1120 panic!("timeout");
1121 },
1122 v = receiver.recv() => {
1123 assert_eq!(v.unwrap(), i);
1124 },
1125 };
1126 }
1127 });
1128 }
1129
1130 fn test_storage_operations<R: Runner>(runner: R)
1131 where
1132 R::Context: Storage,
1133 {
1134 runner.start(|context| async move {
1135 let partition = "test_partition";
1136 let name = b"test_blob";
1137
1138 let (blob, size) = context
1140 .open(partition, name)
1141 .await
1142 .expect("Failed to open blob");
1143 assert_eq!(size, 0, "new blob should have size 0");
1144
1145 let data = b"Hello, Storage!";
1147 blob.write_at(0, data)
1148 .await
1149 .expect("Failed to write to blob");
1150
1151 blob.sync().await.expect("Failed to sync blob");
1153
1154 let read = blob
1156 .read_at(0, data.len())
1157 .await
1158 .expect("Failed to read from blob");
1159 assert_eq!(read.coalesce(), data);
1160
1161 blob.sync().await.expect("Failed to sync blob");
1163
1164 let blobs = context
1166 .scan(partition)
1167 .await
1168 .expect("Failed to scan partition");
1169 assert!(blobs.contains(&name.to_vec()));
1170
1171 let (blob, len) = context
1173 .open(partition, name)
1174 .await
1175 .expect("Failed to reopen blob");
1176 assert_eq!(len, data.len() as u64);
1177
1178 let read = blob.read_at(7, 7).await.expect("Failed to read data");
1180 assert_eq!(read.coalesce(), b"Storage");
1181
1182 blob.sync().await.expect("Failed to sync blob");
1184
1185 context
1187 .remove(partition, Some(name))
1188 .await
1189 .expect("Failed to remove blob");
1190
1191 let blobs = context
1193 .scan(partition)
1194 .await
1195 .expect("Failed to scan partition");
1196 assert!(!blobs.contains(&name.to_vec()));
1197
1198 context
1200 .remove(partition, None)
1201 .await
1202 .expect("Failed to remove partition");
1203
1204 let result = context.scan(partition).await;
1206 assert!(matches!(result, Err(Error::PartitionMissing(_))));
1207 });
1208 }
1209
1210 fn test_blob_read_write<R: Runner>(runner: R)
1211 where
1212 R::Context: Storage,
1213 {
1214 runner.start(|context| async move {
1215 let partition = "test_partition";
1216 let name = b"test_blob_rw";
1217
1218 let (blob, _) = context
1220 .open(partition, name)
1221 .await
1222 .expect("Failed to open blob");
1223
1224 let data1 = b"Hello";
1226 let data2 = b"World";
1227 blob.write_at(0, data1)
1228 .await
1229 .expect("Failed to write data1");
1230 blob.write_at(5, data2)
1231 .await
1232 .expect("Failed to write data2");
1233
1234 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1236 let read = read.coalesce();
1237 assert_eq!(&read.as_ref()[..5], data1);
1238 assert_eq!(&read.as_ref()[5..], data2);
1239
1240 let result = blob.read_at(10, 10).await;
1242 assert!(result.is_err());
1243
1244 let data3 = b"Store";
1246 blob.write_at(5, data3)
1247 .await
1248 .expect("Failed to write data3");
1249
1250 let read = blob.read_at(0, 10).await.expect("Failed to read data");
1252 let read = read.coalesce();
1253 assert_eq!(&read.as_ref()[..5], data1);
1254 assert_eq!(&read.as_ref()[5..], data3);
1255
1256 let result = blob.read_at(10, 10).await;
1258 assert!(result.is_err());
1259 });
1260 }
1261
1262 fn test_blob_resize<R: Runner>(runner: R)
1263 where
1264 R::Context: Storage,
1265 {
1266 runner.start(|context| async move {
1267 let partition = "test_partition_resize";
1268 let name = b"test_blob_resize";
1269
1270 let (blob, _) = context
1272 .open(partition, name)
1273 .await
1274 .expect("Failed to open blob");
1275
1276 let data = b"some data";
1277 blob.write_at(0, data.to_vec())
1278 .await
1279 .expect("Failed to write");
1280 blob.sync().await.expect("Failed to sync after write");
1281
1282 let (blob, len) = context.open(partition, name).await.unwrap();
1284 assert_eq!(len, data.len() as u64);
1285
1286 let new_len = (data.len() as u64) * 2;
1288 blob.resize(new_len)
1289 .await
1290 .expect("Failed to resize to extend");
1291 blob.sync().await.expect("Failed to sync after resize");
1292
1293 let (blob, len) = context.open(partition, name).await.unwrap();
1295 assert_eq!(len, new_len);
1296
1297 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1299 assert_eq!(read_buf.coalesce(), data);
1300
1301 let extended_part = blob.read_at(data.len() as u64, data.len()).await.unwrap();
1303 assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1304
1305 blob.resize(data.len() as u64).await.unwrap();
1307 blob.sync().await.unwrap();
1308
1309 let (blob, size) = context.open(partition, name).await.unwrap();
1311 assert_eq!(size, data.len() as u64);
1312
1313 let read_buf = blob.read_at(0, data.len()).await.unwrap();
1315 assert_eq!(read_buf.coalesce(), data);
1316 blob.sync().await.unwrap();
1317 });
1318 }
1319
1320 fn test_many_partition_read_write<R: Runner>(runner: R)
1321 where
1322 R::Context: Storage,
1323 {
1324 runner.start(|context| async move {
1325 let partitions = ["partition1", "partition2", "partition3"];
1326 let name = b"test_blob_rw";
1327 let data1 = b"Hello";
1328 let data2 = b"World";
1329
1330 for (additional, partition) in partitions.iter().enumerate() {
1331 let (blob, _) = context
1333 .open(partition, name)
1334 .await
1335 .expect("Failed to open blob");
1336
1337 blob.write_at(0, data1)
1339 .await
1340 .expect("Failed to write data1");
1341 blob.write_at(5 + additional as u64, data2)
1342 .await
1343 .expect("Failed to write data2");
1344
1345 blob.sync().await.expect("Failed to sync blob");
1347 }
1348
1349 for (additional, partition) in partitions.iter().enumerate() {
1350 let (blob, len) = context
1352 .open(partition, name)
1353 .await
1354 .expect("Failed to open blob");
1355 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1356
1357 let read = blob
1359 .read_at(0, 10 + additional)
1360 .await
1361 .expect("Failed to read data");
1362 let read = read.coalesce();
1363 assert_eq!(&read.as_ref()[..5], b"Hello");
1364 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1365 }
1366 });
1367 }
1368
1369 fn test_blob_read_past_length<R: Runner>(runner: R)
1370 where
1371 R::Context: Storage,
1372 {
1373 runner.start(|context| async move {
1374 let partition = "test_partition";
1375 let name = b"test_blob_rw";
1376
1377 let (blob, _) = context
1379 .open(partition, name)
1380 .await
1381 .expect("Failed to open blob");
1382
1383 let result = blob.read_at(0, 10).await;
1385 assert!(result.is_err());
1386
1387 let data = b"Hello, Storage!".to_vec();
1389 blob.write_at(0, data)
1390 .await
1391 .expect("Failed to write to blob");
1392
1393 let result = blob.read_at(0, 20).await;
1395 assert!(result.is_err());
1396 })
1397 }
1398
1399 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1400 where
1401 R::Context: Spawner + Storage + Metrics,
1402 {
1403 runner.start(|context| async move {
1404 let partition = "test_partition";
1405 let name = b"test_blob_rw";
1406
1407 let (blob, _) = context
1409 .open(partition, name)
1410 .await
1411 .expect("Failed to open blob");
1412
1413 let data = b"Hello, Storage!";
1415 blob.write_at(0, data)
1416 .await
1417 .expect("Failed to write to blob");
1418
1419 blob.sync().await.expect("Failed to sync blob");
1421
1422 let check1 = context.with_label("check1").spawn({
1424 let blob = blob.clone();
1425 let data_len = data.len();
1426 move |_| async move {
1427 let read = blob
1428 .read_at(0, data_len)
1429 .await
1430 .expect("Failed to read from blob");
1431 assert_eq!(read.coalesce(), data);
1432 }
1433 });
1434 let check2 = context.with_label("check2").spawn({
1435 let blob = blob.clone();
1436 let data_len = data.len();
1437 move |_| async move {
1438 let read = blob
1439 .read_at(0, data_len)
1440 .await
1441 .expect("Failed to read from blob");
1442 assert_eq!(read.coalesce(), data);
1443 }
1444 });
1445
1446 let result = join!(check1, check2);
1448 assert!(result.0.is_ok());
1449 assert!(result.1.is_ok());
1450
1451 let read = blob
1453 .read_at(0, data.len())
1454 .await
1455 .expect("Failed to read from blob");
1456 assert_eq!(read.coalesce(), data);
1457
1458 drop(blob);
1460
1461 let buffer = context.encode();
1463 assert!(buffer.contains("open_blobs 0"));
1464 });
1465 }
1466
1467 fn test_shutdown<R: Runner>(runner: R)
1468 where
1469 R::Context: Spawner + Metrics + Clock,
1470 {
1471 let kill = 9;
1472 runner.start(|context| async move {
1473 let before = context
1475 .with_label("before")
1476 .spawn(move |context| async move {
1477 let mut signal = context.stopped();
1478 let value = (&mut signal).await.unwrap();
1479 assert_eq!(value, kill);
1480 drop(signal);
1481 });
1482
1483 let result = context.clone().stop(kill, None).await;
1485 assert!(result.is_ok());
1486
1487 let after = context
1489 .with_label("after")
1490 .spawn(move |context| async move {
1491 let value = context.stopped().await.unwrap();
1493 assert_eq!(value, kill);
1494 });
1495
1496 let result = join!(before, after);
1498 assert!(result.0.is_ok());
1499 assert!(result.1.is_ok());
1500 });
1501 }
1502
1503 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1504 where
1505 R::Context: Spawner + Metrics + Clock,
1506 {
1507 let kill = 42;
1508 runner.start(|context| async move {
1509 let (started_tx, mut started_rx) = mpsc::channel(3);
1510 let counter = Arc::new(AtomicU32::new(0));
1511
1512 let task = |cleanup_duration: Duration| {
1515 let context = context.clone();
1516 let counter = counter.clone();
1517 let started_tx = started_tx.clone();
1518 context.spawn(move |context| async move {
1519 let mut signal = context.stopped();
1521 started_tx.send(()).await.unwrap();
1522
1523 let value = (&mut signal).await.unwrap();
1525 assert_eq!(value, kill);
1526 context.sleep(cleanup_duration).await;
1527 counter.fetch_add(1, Ordering::SeqCst);
1528
1529 drop(signal);
1531 })
1532 };
1533
1534 let task1 = task(Duration::from_millis(10));
1535 let task2 = task(Duration::from_millis(20));
1536 let task3 = task(Duration::from_millis(30));
1537
1538 for _ in 0..3 {
1540 started_rx.recv().await.unwrap();
1541 }
1542
1543 context.stop(kill, None).await.unwrap();
1545 assert_eq!(counter.load(Ordering::SeqCst), 3);
1546
1547 let result = join!(task1, task2, task3);
1549 assert!(result.0.is_ok());
1550 assert!(result.1.is_ok());
1551 assert!(result.2.is_ok());
1552 });
1553 }
1554
1555 fn test_shutdown_timeout<R: Runner>(runner: R)
1556 where
1557 R::Context: Spawner + Metrics + Clock,
1558 {
1559 let kill = 42;
1560 runner.start(|context| async move {
1561 let (started_tx, started_rx) = oneshot::channel();
1563
1564 context.clone().spawn(move |context| async move {
1566 let signal = context.stopped();
1567 started_tx.send(()).unwrap();
1568 pending::<()>().await;
1569 signal.await.unwrap();
1570 });
1571
1572 started_rx.await.unwrap();
1574 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1575
1576 assert!(matches!(result, Err(Error::Timeout)));
1578 });
1579 }
1580
1581 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1582 where
1583 R::Context: Spawner + Metrics + Clock,
1584 {
1585 let kill1 = 42;
1586 let kill2 = 43;
1587
1588 runner.start(|context| async move {
1589 let (started_tx, started_rx) = oneshot::channel();
1590 let counter = Arc::new(AtomicU32::new(0));
1591
1592 let task = context.with_label("blocking_task").spawn({
1594 let counter = counter.clone();
1595 move |context| async move {
1596 let mut signal = context.stopped();
1598 started_tx.send(()).unwrap();
1599
1600 let value = (&mut signal).await.unwrap();
1602 assert_eq!(value, kill1);
1603 context.sleep(Duration::from_millis(50)).await;
1604
1605 counter.fetch_add(1, Ordering::SeqCst);
1607 drop(signal);
1608 }
1609 });
1610
1611 started_rx.await.unwrap();
1613
1614 let stop_task1 = context.clone().stop(kill1, None);
1617 pin_mut!(stop_task1);
1618 let stop_task2 = context.clone().stop(kill2, None);
1619 pin_mut!(stop_task2);
1620
1621 assert!(stop_task1.as_mut().now_or_never().is_none());
1623 assert!(stop_task2.as_mut().now_or_never().is_none());
1624
1625 assert!(stop_task1.await.is_ok());
1627 assert!(stop_task2.await.is_ok());
1628
1629 let sig = context.stopped().await;
1631 assert_eq!(sig.unwrap(), kill1);
1632
1633 let result = task.await;
1635 assert!(result.is_ok());
1636 assert_eq!(counter.load(Ordering::SeqCst), 1);
1637
1638 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1640 });
1641 }
1642
1643 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1644 where
1645 R::Context: Spawner + Metrics,
1646 {
1647 runner.start(|context| async move {
1648 context
1650 .with_label("before")
1651 .spawn(move |context| async move {
1652 let mut signal = context.stopped();
1653 let value = (&mut signal).await.unwrap();
1654
1655 assert_eq!(value, 42);
1657 drop(signal);
1658 });
1659
1660 reschedule().await;
1662 });
1663 }
1664
1665 fn test_spawn_dedicated<R: Runner>(runner: R)
1666 where
1667 R::Context: Spawner,
1668 {
1669 runner.start(|context| async move {
1670 let handle = context.dedicated().spawn(|_| async move { 42 });
1671 assert!(matches!(handle.await, Ok(42)));
1672 });
1673 }
1674
1675 fn test_spawn<R: Runner>(runner: R)
1676 where
1677 R::Context: Spawner + Clock,
1678 {
1679 runner.start(|context| async move {
1680 let child_handle = Arc::new(Mutex::new(None));
1681 let child_handle2 = child_handle.clone();
1682
1683 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1684 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1685 let parent_handle = context.spawn(move |context| async move {
1686 let handle = context.spawn(|_| async {});
1688
1689 *child_handle2.lock() = Some(handle);
1691
1692 parent_initialized_tx.send(()).unwrap();
1693
1694 parent_complete_rx.await.unwrap();
1696 });
1697
1698 parent_initialized_rx.await.unwrap();
1700
1701 let child_handle = child_handle.lock().take().unwrap();
1703 assert!(child_handle.await.is_ok());
1704
1705 parent_complete_tx.send(()).unwrap();
1707
1708 assert!(parent_handle.await.is_ok());
1710 });
1711 }
1712
1713 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1714 where
1715 R::Context: Spawner + Clock,
1716 {
1717 runner.start(|context| async move {
1718 let child_handle = Arc::new(Mutex::new(None));
1719 let child_handle2 = child_handle.clone();
1720
1721 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1722 let parent_handle = context.spawn(move |context| async move {
1723 let handle = context.spawn(|_| pending::<()>());
1725
1726 *child_handle2.lock() = Some(handle);
1728
1729 parent_initialized_tx.send(()).unwrap();
1730
1731 pending::<()>().await
1733 });
1734
1735 parent_initialized_rx.await.unwrap();
1737
1738 parent_handle.abort();
1740 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1741
1742 let child_handle = child_handle.lock().take().unwrap();
1744 assert!(matches!(child_handle.await, Err(Error::Closed)));
1745 });
1746 }
1747
1748 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1749 where
1750 R::Context: Spawner + Clock,
1751 {
1752 runner.start(|context| async move {
1753 let child_handle = Arc::new(Mutex::new(None));
1754 let child_handle2 = child_handle.clone();
1755
1756 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1757 let parent_handle = context.spawn(move |context| async move {
1758 let handle = context.spawn(|_| pending::<()>());
1760
1761 *child_handle2.lock() = Some(handle);
1763
1764 parent_complete_rx.await.unwrap();
1766 });
1767
1768 parent_complete_tx.send(()).unwrap();
1770
1771 assert!(parent_handle.await.is_ok());
1773
1774 let child_handle = child_handle.lock().take().unwrap();
1776 assert!(matches!(child_handle.await, Err(Error::Closed)));
1777 });
1778 }
1779
1780 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1781 where
1782 R::Context: Spawner + Clock,
1783 {
1784 runner.start(|context| async move {
1785 let c0 = context.clone();
1795 let g0 = c0.clone();
1796 let g1 = c0.clone();
1797 let c1 = context.clone();
1798 let g2 = c1.clone();
1799 let g3 = c1.clone();
1800 let c2 = context.clone();
1801 let g4 = c2.clone();
1802 let g5 = c2.clone();
1803
1804 let handles = Arc::new(Mutex::new(Vec::new()));
1806 let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1807 let root_task = context.spawn({
1808 let handles = handles.clone();
1809 move |_| async move {
1810 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1811 {
1812 let handle = context.spawn({
1813 let handles = handles.clone();
1814 let initialized_tx = initialized_tx.clone();
1815 move |_| async move {
1816 for grandchild in grandchildren {
1817 let handle = grandchild.spawn(|_| async {
1818 pending::<()>().await;
1819 });
1820 handles.lock().push(handle);
1821 initialized_tx.send(()).await.unwrap();
1822 }
1823
1824 pending::<()>().await;
1825 }
1826 });
1827 handles.lock().push(handle);
1828 initialized_tx.send(()).await.unwrap();
1829 }
1830
1831 pending::<()>().await;
1832 }
1833 });
1834
1835 for _ in 0..9 {
1837 initialized_rx.recv().await.unwrap();
1838 }
1839
1840 assert_eq!(handles.lock().len(), 9);
1842
1843 root_task.abort();
1845 assert!(matches!(root_task.await, Err(Error::Closed)));
1846
1847 let handles = handles.lock().drain(..).collect::<Vec<_>>();
1849 for handle in handles {
1850 assert!(matches!(handle.await, Err(Error::Closed)));
1851 }
1852 });
1853 }
1854
1855 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1856 where
1857 R::Context: Spawner + Clock,
1858 {
1859 runner.start(|context| async move {
1860 let (child_started_tx, child_started_rx) = oneshot::channel();
1861 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1862 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1863 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1864 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1865 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1866 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1867
1868 let parent = context.spawn(move |context| async move {
1869 let child_handle = context.clone().spawn(|_| async move {
1871 child_started_tx.send(()).unwrap();
1872 child_complete_rx.await.unwrap();
1874 });
1875 assert!(
1876 child_handle_tx.send(child_handle).is_ok(),
1877 "child handle receiver dropped"
1878 );
1879
1880 let sibling_handle = context.clone().spawn(move |_| async move {
1882 sibling_started_tx.send(()).unwrap();
1883 sibling_complete_rx.await.unwrap();
1885 });
1886 assert!(
1887 sibling_handle_tx.send(sibling_handle).is_ok(),
1888 "sibling handle receiver dropped"
1889 );
1890
1891 parent_complete_rx.await.unwrap();
1893 });
1894
1895 child_started_rx.await.unwrap();
1897 sibling_started_rx.await.unwrap();
1898
1899 sibling_complete_tx.send(()).unwrap();
1901 assert!(sibling_handle_rx.await.is_ok());
1902
1903 child_complete_tx.send(()).unwrap();
1905 assert!(child_handle_rx.await.is_ok());
1906
1907 parent_complete_tx.send(()).unwrap();
1909 assert!(parent.await.is_ok());
1910 });
1911 }
1912
1913 fn test_spawn_clone_chain<R: Runner>(runner: R)
1914 where
1915 R::Context: Spawner + Clock,
1916 {
1917 runner.start(|context| async move {
1918 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1919 let (child_started_tx, child_started_rx) = oneshot::channel();
1920 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1921 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1922 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1923
1924 let parent = context.clone().spawn({
1925 move |context| async move {
1926 let child = context.clone().spawn({
1927 move |context| async move {
1928 let grandchild = context.clone().spawn({
1929 move |_| async move {
1930 grandchild_started_tx.send(()).unwrap();
1931 pending::<()>().await;
1932 }
1933 });
1934 assert!(
1935 grandchild_handle_tx.send(grandchild).is_ok(),
1936 "grandchild handle receiver dropped"
1937 );
1938 child_started_tx.send(()).unwrap();
1939 pending::<()>().await;
1940 }
1941 });
1942 assert!(
1943 child_handle_tx.send(child).is_ok(),
1944 "child handle receiver dropped"
1945 );
1946 parent_started_tx.send(()).unwrap();
1947 pending::<()>().await;
1948 }
1949 });
1950
1951 parent_started_rx.await.unwrap();
1952 child_started_rx.await.unwrap();
1953 grandchild_started_rx.await.unwrap();
1954
1955 let child_handle = child_handle_rx.await.unwrap();
1956 let grandchild_handle = grandchild_handle_rx.await.unwrap();
1957
1958 parent.abort();
1959 assert!(parent.await.is_err());
1960
1961 assert!(child_handle.await.is_err());
1962 assert!(grandchild_handle.await.is_err());
1963 });
1964 }
1965
1966 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1967 where
1968 R::Context: Spawner + Clock,
1969 {
1970 runner.start(|context| async move {
1971 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1972 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1973
1974 let parent = context.clone().spawn({
1975 move |context| async move {
1976 let clone1 = context.clone();
1977 let clone2 = clone1.clone();
1978 let clone3 = clone2.clone();
1979
1980 let leaf = clone3.clone().spawn({
1981 move |_| async move {
1982 leaf_started_tx.send(()).unwrap();
1983 pending::<()>().await;
1984 }
1985 });
1986
1987 leaf_handle_tx
1988 .send(leaf)
1989 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1990 pending::<()>().await;
1991 }
1992 });
1993
1994 leaf_started_rx.await.unwrap();
1995 let leaf_handle = leaf_handle_rx.await.unwrap();
1996
1997 parent.abort();
1998 assert!(parent.await.is_err());
1999 assert!(leaf_handle.await.is_err());
2000 });
2001 }
2002
2003 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
2004 where
2005 R::Context: Spawner,
2006 {
2007 runner.start(|context| async move {
2008 let context = if dedicated {
2009 context.dedicated()
2010 } else {
2011 context.shared(true)
2012 };
2013
2014 let handle = context.spawn(|_| async move { 42 });
2015 let result = handle.await;
2016 assert!(matches!(result, Ok(42)));
2017 });
2018 }
2019
2020 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2021 where
2022 R::Context: Spawner + Clock,
2023 {
2024 runner.start(|context| async move {
2025 let context = if dedicated {
2026 context.dedicated()
2027 } else {
2028 context.shared(true)
2029 };
2030
2031 context.clone().spawn(|_| async move {
2032 panic!("blocking task panicked");
2033 });
2034
2035 loop {
2037 context.sleep(Duration::from_millis(100)).await;
2038 }
2039 });
2040 }
2041
2042 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2043 where
2044 R::Context: Spawner + Clock,
2045 {
2046 let result: Result<(), Error> = runner.start(|context| async move {
2047 let context = if dedicated {
2048 context.dedicated()
2049 } else {
2050 context.shared(true)
2051 };
2052
2053 let handle = context.clone().spawn(|_| async move {
2054 panic!("blocking task panicked");
2055 });
2056 handle.await
2057 });
2058 assert!(matches!(result, Err(Error::Exited)));
2059 }
2060
2061 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2062 runner.start(|_| async move {
2063 let dropper = Arc::new(());
2065 let executor = deterministic::Runner::default();
2066 executor.start({
2067 let dropper = dropper.clone();
2068 move |context| async move {
2069 let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2071 let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2072 let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2073
2074 context.with_label("task1").spawn({
2076 let setup_tx = setup_tx.clone();
2077 let dropper = dropper.clone();
2078 move |_| async move {
2079 tx2.send(()).unwrap();
2081 rx1.recv().await.unwrap();
2082 setup_tx.send(()).unwrap();
2083
2084 while rx1.recv().await.is_some() {}
2086 drop(tx2);
2087 drop(dropper);
2088 }
2089 });
2090
2091 context.with_label("task2").spawn(move |_| async move {
2093 tx1.send(()).unwrap();
2095 rx2.recv().await.unwrap();
2096 setup_tx.send(()).unwrap();
2097
2098 while rx2.recv().await.is_some() {}
2100 drop(tx1);
2101 drop(dropper);
2102 });
2103
2104 setup_rx.recv().await.unwrap();
2106 setup_rx.recv().await.unwrap();
2107 }
2108 });
2109
2110 Arc::try_unwrap(dropper).expect("references remaining");
2112 });
2113 }
2114
2115 fn test_late_waker<R: Runner>(runner: R)
2116 where
2117 R::Context: Metrics + Spawner,
2118 {
2119 struct CaptureWaker {
2122 tx: Option<oneshot::Sender<Waker>>,
2123 sent: bool,
2124 }
2125 impl Future for CaptureWaker {
2126 type Output = ();
2127 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2128 if !self.sent {
2129 if let Some(tx) = self.tx.take() {
2130 let _ = tx.send(cx.waker().clone());
2132 }
2133 self.sent = true;
2134 }
2135 Poll::Pending
2136 }
2137 }
2138
2139 struct WakeOnDrop(Option<Waker>);
2141 impl Drop for WakeOnDrop {
2142 fn drop(&mut self) {
2143 if let Some(w) = self.0.take() {
2144 w.wake_by_ref();
2145 }
2146 }
2147 }
2148
2149 let holder = runner.start(|context| async move {
2151 let (tx, rx) = oneshot::channel::<Waker>();
2153
2154 context
2156 .with_label("capture_waker")
2157 .spawn(move |_| async move {
2158 CaptureWaker {
2159 tx: Some(tx),
2160 sent: false,
2161 }
2162 .await;
2163 });
2164
2165 utils::reschedule().await;
2167
2168 let waker = rx.await.expect("waker not received");
2170
2171 WakeOnDrop(Some(waker))
2173 });
2174
2175 drop(holder);
2178 }
2179
2180 fn test_metrics<R: Runner>(runner: R)
2181 where
2182 R::Context: Metrics,
2183 {
2184 runner.start(|context| async move {
2185 assert_eq!(context.label(), "");
2187
2188 let counter = Counter::<u64>::default();
2190 context.register("test", "test", counter.clone());
2191
2192 counter.inc();
2194
2195 let buffer = context.encode();
2197 assert!(buffer.contains("test_total 1"));
2198
2199 let context = context.with_label("nested");
2201 let nested_counter = Counter::<u64>::default();
2202 context.register("test", "test", nested_counter.clone());
2203
2204 nested_counter.inc();
2206
2207 let buffer = context.encode();
2209 assert!(buffer.contains("nested_test_total 1"));
2210 assert!(buffer.contains("test_total 1"));
2211 });
2212 }
2213
2214 fn test_metrics_with_attribute<R: Runner>(runner: R)
2215 where
2216 R::Context: Metrics,
2217 {
2218 runner.start(|context| async move {
2219 let ctx_epoch5 = context
2221 .with_label("consensus")
2222 .with_attribute("epoch", "e5");
2223
2224 let counter = Counter::<u64>::default();
2226 ctx_epoch5.register("votes", "vote count", counter.clone());
2227 counter.inc();
2228
2229 let buffer = context.encode();
2231 assert!(
2232 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2233 "Expected metric with epoch attribute, got: {}",
2234 buffer
2235 );
2236
2237 let ctx_epoch6 = context
2239 .with_label("consensus")
2240 .with_attribute("epoch", "e6");
2241 let counter2 = Counter::<u64>::default();
2242 ctx_epoch6.register("votes", "vote count", counter2.clone());
2243 counter2.inc();
2244 counter2.inc();
2245
2246 let buffer = context.encode();
2248 assert!(
2249 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2250 "Expected metric with epoch=e5, got: {}",
2251 buffer
2252 );
2253 assert!(
2254 buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2255 "Expected metric with epoch=e6, got: {}",
2256 buffer
2257 );
2258
2259 assert_eq!(
2261 buffer.matches("# HELP consensus_votes").count(),
2262 1,
2263 "HELP should appear exactly once, got: {}",
2264 buffer
2265 );
2266 assert_eq!(
2267 buffer.matches("# TYPE consensus_votes").count(),
2268 1,
2269 "TYPE should appear exactly once, got: {}",
2270 buffer
2271 );
2272
2273 let ctx_multi = context
2275 .with_label("engine")
2276 .with_attribute("region", "us")
2277 .with_attribute("instance", "i1");
2278 let counter3 = Counter::<u64>::default();
2279 ctx_multi.register("requests", "request count", counter3.clone());
2280 counter3.inc();
2281
2282 let buffer = context.encode();
2283 assert!(
2284 buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2285 "Expected metric with sorted attributes, got: {}",
2286 buffer
2287 );
2288 });
2289 }
2290
2291 #[test]
2292 fn test_deterministic_metrics_with_attribute() {
2293 let executor = deterministic::Runner::default();
2294 test_metrics_with_attribute(executor);
2295 }
2296
2297 #[test]
2298 fn test_tokio_metrics_with_attribute() {
2299 let runner = tokio::Runner::default();
2300 test_metrics_with_attribute(runner);
2301 }
2302
2303 fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2304 where
2305 R::Context: Metrics,
2306 {
2307 runner.start(|context| async move {
2308 let ctx = context
2310 .with_label("orchestrator")
2311 .with_attribute("epoch", "e5")
2312 .with_label("engine");
2313
2314 let counter = Counter::<u64>::default();
2316 ctx.register("votes", "vote count", counter.clone());
2317 counter.inc();
2318
2319 let buffer = context.encode();
2321 assert!(
2322 buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2323 "Expected metric with preserved epoch attribute, got: {}",
2324 buffer
2325 );
2326
2327 let ctx2 = context
2329 .with_label("outer")
2330 .with_attribute("region", "us")
2331 .with_label("middle")
2332 .with_attribute("az", "east")
2333 .with_label("inner");
2334
2335 let counter2 = Counter::<u64>::default();
2336 ctx2.register("requests", "request count", counter2.clone());
2337 counter2.inc();
2338 counter2.inc();
2339
2340 let buffer = context.encode();
2341 assert!(
2342 buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2343 "Expected metric with all attributes preserved and sorted, got: {}",
2344 buffer
2345 );
2346 });
2347 }
2348
2349 #[test]
2350 fn test_deterministic_metrics_attribute_with_nested_label() {
2351 let executor = deterministic::Runner::default();
2352 test_metrics_attribute_with_nested_label(executor);
2353 }
2354
2355 #[test]
2356 fn test_tokio_metrics_attribute_with_nested_label() {
2357 let runner = tokio::Runner::default();
2358 test_metrics_attribute_with_nested_label(runner);
2359 }
2360
2361 fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2362 where
2363 R::Context: Metrics,
2364 {
2365 runner.start(|context| async move {
2366 let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2368 let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2369
2370 let c1 = Counter::<u64>::default();
2372 ctx_a.register("requests", "help", c1);
2373
2374 let c2 = Counter::<u64>::default();
2376 ctx_b.register("requests", "help", c2);
2377
2378 let c3 = Counter::<u64>::default();
2380 ctx_a.register("errors", "help", c3);
2381
2382 let output = context.encode();
2383
2384 assert!(
2386 output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2387 "ctx_a requests should have epoch=1: {output}"
2388 );
2389 assert!(
2390 output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2391 "ctx_a errors should have epoch=1: {output}"
2392 );
2393 assert!(
2394 !output.contains("component_a_requests_total{epoch=\"2\"}"),
2395 "ctx_a requests should not have epoch=2: {output}"
2396 );
2397
2398 assert!(
2400 output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2401 "ctx_b should have epoch=2: {output}"
2402 );
2403 assert!(
2404 !output.contains("component_b_requests_total{epoch=\"1\"}"),
2405 "ctx_b should not have epoch=1: {output}"
2406 );
2407 });
2408 }
2409
2410 #[test]
2411 fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2412 let executor = deterministic::Runner::default();
2413 test_metrics_attributes_isolated_between_contexts(executor);
2414 }
2415
2416 #[test]
2417 fn test_tokio_metrics_attributes_isolated_between_contexts() {
2418 let runner = tokio::Runner::default();
2419 test_metrics_attributes_isolated_between_contexts(runner);
2420 }
2421
2422 fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2423 where
2424 R::Context: Metrics,
2425 {
2426 runner.start(|context| async move {
2427 let ctx_ab = context
2429 .with_label("service")
2430 .with_attribute("region", "us")
2431 .with_attribute("env", "prod");
2432
2433 let ctx_ba = context
2434 .with_label("service")
2435 .with_attribute("env", "prod")
2436 .with_attribute("region", "us");
2437
2438 let c1 = Counter::<u64>::default();
2440 ctx_ab.register("requests", "help", c1.clone());
2441 c1.inc();
2442
2443 let c2 = Counter::<u64>::default();
2445 ctx_ba.register("errors", "help", c2.clone());
2446 c2.inc();
2447 c2.inc();
2448
2449 let output = context.encode();
2450
2451 assert!(
2453 output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2454 "requests should have sorted labels: {output}"
2455 );
2456 assert!(
2457 output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2458 "errors should have sorted labels: {output}"
2459 );
2460
2461 assert!(
2463 !output.contains("region=\"us\",env=\"prod\""),
2464 "should not have unsorted label order: {output}"
2465 );
2466 });
2467 }
2468
2469 #[test]
2470 fn test_deterministic_metrics_attributes_sorted_deterministically() {
2471 let executor = deterministic::Runner::default();
2472 test_metrics_attributes_sorted_deterministically(executor);
2473 }
2474
2475 #[test]
2476 fn test_tokio_metrics_attributes_sorted_deterministically() {
2477 let runner = tokio::Runner::default();
2478 test_metrics_attributes_sorted_deterministically(runner);
2479 }
2480
2481 fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2482 where
2483 R::Context: Metrics,
2484 {
2485 runner.start(|context| async move {
2486 let svc_a = context.with_label("service_a");
2488
2489 let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2491
2492 let svc_b_worker = context.with_label("service_b").with_label("worker");
2494
2495 let svc_b_worker_shard = context
2497 .with_label("service_b")
2498 .with_label("worker")
2499 .with_attribute("shard", 99);
2500
2501 let svc_b_manager = context.with_label("service_b").with_label("manager");
2503
2504 let svc_c = context.with_label("service_c");
2506
2507 let c1 = Counter::<u64>::default();
2509 svc_a.register("requests", "help", c1);
2510
2511 let c2 = Counter::<u64>::default();
2512 svc_a_v2.register("requests", "help", c2);
2513
2514 let c3 = Counter::<u64>::default();
2515 svc_b_worker.register("tasks", "help", c3);
2516
2517 let c4 = Counter::<u64>::default();
2518 svc_b_worker_shard.register("tasks", "help", c4);
2519
2520 let c5 = Counter::<u64>::default();
2521 svc_b_manager.register("decisions", "help", c5);
2522
2523 let c6 = Counter::<u64>::default();
2524 svc_c.register("requests", "help", c6);
2525
2526 let output = context.encode();
2527
2528 assert!(
2530 output.contains("service_a_requests_total 0"),
2531 "svc_a plain should exist: {output}"
2532 );
2533 assert!(
2534 output.contains("service_a_requests_total{version=\"2\"} 0"),
2535 "svc_a_v2 should have version=2: {output}"
2536 );
2537
2538 assert!(
2540 output.contains("service_b_worker_tasks_total 0"),
2541 "svc_b_worker plain should exist: {output}"
2542 );
2543 assert!(
2544 output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2545 "svc_b_worker_shard should have shard=99: {output}"
2546 );
2547
2548 assert!(
2550 output.contains("service_b_manager_decisions_total 0"),
2551 "svc_b_manager should have no attributes: {output}"
2552 );
2553 assert!(
2554 !output.contains("service_b_manager_decisions_total{"),
2555 "svc_b_manager should have no attributes at all: {output}"
2556 );
2557
2558 assert!(
2560 output.contains("service_c_requests_total 0"),
2561 "svc_c should have no attributes: {output}"
2562 );
2563 assert!(
2564 !output.contains("service_c_requests_total{"),
2565 "svc_c should have no attributes at all: {output}"
2566 );
2567
2568 assert!(
2570 !output.contains("service_b_manager_decisions_total{shard="),
2571 "svc_b_manager should not have shard: {output}"
2572 );
2573 assert!(
2574 !output.contains("service_a_requests_total{shard="),
2575 "svc_a should not have shard: {output}"
2576 );
2577 assert!(
2578 !output.contains("service_c_requests_total{version="),
2579 "svc_c should not have version: {output}"
2580 );
2581 });
2582 }
2583
2584 #[test]
2585 fn test_deterministic_metrics_nested_labels_with_attributes() {
2586 let executor = deterministic::Runner::default();
2587 test_metrics_nested_labels_with_attributes(executor);
2588 }
2589
2590 #[test]
2591 fn test_tokio_metrics_nested_labels_with_attributes() {
2592 let runner = tokio::Runner::default();
2593 test_metrics_nested_labels_with_attributes(runner);
2594 }
2595
2596 fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2597 where
2598 R::Context: Metrics,
2599 {
2600 runner.start(|context| async move {
2601 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2602 struct RequestLabels {
2603 method: String,
2604 status: u16,
2605 }
2606
2607 let ctx = context
2609 .with_label("api")
2610 .with_attribute("region", "us_east")
2611 .with_attribute("env", "prod");
2612
2613 let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2615 ctx.register("requests", "HTTP requests", requests.clone());
2616
2617 requests
2619 .get_or_create(&RequestLabels {
2620 method: "GET".to_string(),
2621 status: 200,
2622 })
2623 .inc();
2624 requests
2625 .get_or_create(&RequestLabels {
2626 method: "POST".to_string(),
2627 status: 201,
2628 })
2629 .inc();
2630 requests
2631 .get_or_create(&RequestLabels {
2632 method: "GET".to_string(),
2633 status: 404,
2634 })
2635 .inc();
2636
2637 let output = context.encode();
2638
2639 assert!(
2643 output.contains(
2644 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2645 ),
2646 "GET 200 should have merged labels: {output}"
2647 );
2648 assert!(
2649 output.contains(
2650 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2651 ),
2652 "POST 201 should have merged labels: {output}"
2653 );
2654 assert!(
2655 output.contains(
2656 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2657 ),
2658 "GET 404 should have merged labels: {output}"
2659 );
2660
2661 let ctx_plain = context.with_label("api_plain");
2663 let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2664 ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2665
2666 plain_requests
2667 .get_or_create(&RequestLabels {
2668 method: "DELETE".to_string(),
2669 status: 204,
2670 })
2671 .inc();
2672
2673 let output = context.encode();
2674
2675 assert!(
2677 output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2678 "plain DELETE should have only family labels: {output}"
2679 );
2680 assert!(
2681 !output.contains("api_plain_requests_total{env="),
2682 "plain should not have env attribute: {output}"
2683 );
2684 assert!(
2685 !output.contains("api_plain_requests_total{region="),
2686 "plain should not have region attribute: {output}"
2687 );
2688 });
2689 }
2690
2691 #[test]
2692 fn test_deterministic_metrics_family_with_attributes() {
2693 let executor = deterministic::Runner::default();
2694 test_metrics_family_with_attributes(executor);
2695 }
2696
2697 #[test]
2698 fn test_tokio_metrics_family_with_attributes() {
2699 let runner = tokio::Runner::default();
2700 test_metrics_family_with_attributes(runner);
2701 }
2702
2703 fn test_with_scope_register_and_encode<R: Runner>(runner: R)
2704 where
2705 R::Context: Metrics,
2706 {
2707 runner.start(|context| async move {
2708 let scoped = context.with_label("engine").with_scope();
2709 let counter = Counter::<u64>::default();
2710 scoped.register("votes", "vote count", counter.clone());
2711 counter.inc();
2712
2713 let buffer = context.encode();
2714 assert!(
2715 buffer.contains("engine_votes_total 1"),
2716 "scoped metric should appear in encode: {buffer}"
2717 );
2718 });
2719 }
2720
2721 #[test]
2722 fn test_deterministic_with_scope_register_and_encode() {
2723 let executor = deterministic::Runner::default();
2724 test_with_scope_register_and_encode(executor);
2725 }
2726
2727 #[test]
2728 fn test_tokio_with_scope_register_and_encode() {
2729 let runner = tokio::Runner::default();
2730 test_with_scope_register_and_encode(runner);
2731 }
2732
2733 fn test_with_scope_drop_removes_metrics<R: Runner>(runner: R)
2734 where
2735 R::Context: Metrics,
2736 {
2737 runner.start(|context| async move {
2738 let permanent = Counter::<u64>::default();
2740 context.with_label("permanent").register(
2741 "counter",
2742 "permanent counter",
2743 permanent.clone(),
2744 );
2745 permanent.inc();
2746
2747 let scoped = context.with_label("engine").with_scope();
2749 let counter = Counter::<u64>::default();
2750 scoped.register("votes", "vote count", counter.clone());
2751 counter.inc();
2752
2753 let buffer = context.encode();
2755 assert!(buffer.contains("permanent_counter_total 1"));
2756 assert!(buffer.contains("engine_votes_total 1"));
2757
2758 drop(scoped);
2760
2761 let buffer = context.encode();
2763 assert!(
2764 buffer.contains("permanent_counter_total 1"),
2765 "permanent metric should survive scope drop: {buffer}"
2766 );
2767 assert!(
2768 !buffer.contains("engine_votes"),
2769 "scoped metric should be removed after scope drop: {buffer}"
2770 );
2771 });
2772 }
2773
2774 #[test]
2775 fn test_deterministic_with_scope_drop_removes_metrics() {
2776 let executor = deterministic::Runner::default();
2777 test_with_scope_drop_removes_metrics(executor);
2778 }
2779
2780 #[test]
2781 fn test_tokio_with_scope_drop_removes_metrics() {
2782 let runner = tokio::Runner::default();
2783 test_with_scope_drop_removes_metrics(runner);
2784 }
2785
2786 fn test_with_scope_attributes<R: Runner>(runner: R)
2787 where
2788 R::Context: Metrics,
2789 {
2790 runner.start(|context| async move {
2791 let epoch1 = context
2793 .with_label("engine")
2794 .with_attribute("epoch", 1)
2795 .with_scope();
2796 let c1 = Counter::<u64>::default();
2797 epoch1.register("votes", "vote count", c1.clone());
2798 c1.inc();
2799
2800 let epoch2 = context
2801 .with_label("engine")
2802 .with_attribute("epoch", 2)
2803 .with_scope();
2804 let c2 = Counter::<u64>::default();
2805 epoch2.register("votes", "vote count", c2.clone());
2806 c2.inc();
2807 c2.inc();
2808
2809 let buffer = context.encode();
2811 assert!(buffer.contains("engine_votes_total{epoch=\"1\"} 1"));
2812 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2813
2814 assert_eq!(
2816 buffer.matches("# HELP engine_votes").count(),
2817 1,
2818 "HELP should appear once: {buffer}"
2819 );
2820 assert_eq!(
2821 buffer.matches("# TYPE engine_votes").count(),
2822 1,
2823 "TYPE should appear once: {buffer}"
2824 );
2825
2826 drop(epoch1);
2828 let buffer = context.encode();
2829 assert!(
2830 !buffer.contains("epoch=\"1\""),
2831 "epoch 1 should be gone: {buffer}"
2832 );
2833 assert!(buffer.contains("engine_votes_total{epoch=\"2\"} 2"));
2834
2835 drop(epoch2);
2837 let buffer = context.encode();
2838 assert!(
2839 !buffer.contains("engine_votes"),
2840 "all epoch metrics should be gone: {buffer}"
2841 );
2842 });
2843 }
2844
2845 #[test]
2846 fn test_deterministic_with_scope_attributes() {
2847 let executor = deterministic::Runner::default();
2848 test_with_scope_attributes(executor);
2849 }
2850
2851 #[test]
2852 fn test_tokio_with_scope_attributes() {
2853 let runner = tokio::Runner::default();
2854 test_with_scope_attributes(runner);
2855 }
2856
2857 fn test_with_scope_inherits_on_with_label<R: Runner>(runner: R)
2858 where
2859 R::Context: Metrics,
2860 {
2861 runner.start(|context| async move {
2862 let scoped = context.with_label("engine").with_scope();
2863
2864 let child = scoped.with_label("batcher");
2866 let counter = Counter::<u64>::default();
2867 child.register("msgs", "message count", counter.clone());
2868 counter.inc();
2869
2870 let buffer = context.encode();
2871 assert!(buffer.contains("engine_batcher_msgs_total 1"));
2872
2873 drop(child);
2875 drop(scoped);
2876 let buffer = context.encode();
2877 assert!(
2878 !buffer.contains("engine_batcher_msgs"),
2879 "child metric should be removed with scope: {buffer}"
2880 );
2881 });
2882 }
2883
2884 #[test]
2885 fn test_deterministic_with_scope_inherits_on_with_label() {
2886 let executor = deterministic::Runner::default();
2887 test_with_scope_inherits_on_with_label(executor);
2888 }
2889
2890 #[test]
2891 fn test_tokio_with_scope_inherits_on_with_label() {
2892 let runner = tokio::Runner::default();
2893 test_with_scope_inherits_on_with_label(runner);
2894 }
2895
2896 fn test_multiple_scopes<R: Runner>(runner: R)
2897 where
2898 R::Context: Metrics,
2899 {
2900 runner.start(|context| async move {
2901 let ctx_a = context.with_label("a").with_scope();
2902 let ctx_b = context.with_label("b").with_scope();
2903
2904 let ca = Counter::<u64>::default();
2905 ctx_a.register("counter", "a counter", ca.clone());
2906 ca.inc();
2907
2908 let cb = Counter::<u64>::default();
2909 ctx_b.register("counter", "b counter", cb.clone());
2910 cb.inc();
2911 cb.inc();
2912
2913 let buffer = context.encode();
2914 assert!(buffer.contains("a_counter_total 1"));
2915 assert!(buffer.contains("b_counter_total 2"));
2916
2917 drop(ctx_a);
2919 let buffer = context.encode();
2920 assert!(!buffer.contains("a_counter"));
2921 assert!(buffer.contains("b_counter_total 2"));
2922
2923 drop(ctx_b);
2925 let buffer = context.encode();
2926 assert!(!buffer.contains("b_counter"));
2927 });
2928 }
2929
2930 #[test]
2931 fn test_deterministic_multiple_scopes() {
2932 let executor = deterministic::Runner::default();
2933 test_multiple_scopes(executor);
2934 }
2935
2936 #[test]
2937 fn test_tokio_multiple_scopes() {
2938 let runner = tokio::Runner::default();
2939 test_multiple_scopes(runner);
2940 }
2941
2942 fn test_encode_single_eof<R: Runner>(runner: R)
2943 where
2944 R::Context: Metrics,
2945 {
2946 runner.start(|context| async move {
2947 let root_counter = Counter::<u64>::default();
2948 context.register("root", "root metric", root_counter.clone());
2949 root_counter.inc();
2950
2951 let scoped = context.with_label("engine").with_scope();
2952 let scoped_counter = Counter::<u64>::default();
2953 scoped.register("ops", "scoped metric", scoped_counter.clone());
2954 scoped_counter.inc();
2955
2956 let buffer = context.encode();
2957 assert!(
2958 buffer.contains("root_total 1"),
2959 "root metric missing: {buffer}"
2960 );
2961 assert!(
2962 buffer.contains("engine_ops_total 1"),
2963 "scoped metric missing: {buffer}"
2964 );
2965 assert_eq!(
2966 buffer.matches("# EOF").count(),
2967 1,
2968 "expected exactly one EOF marker: {buffer}"
2969 );
2970 assert!(
2971 buffer.ends_with("# EOF\n"),
2972 "EOF must be the last line: {buffer}"
2973 );
2974 });
2975 }
2976
2977 #[test]
2978 fn test_deterministic_encode_single_eof() {
2979 let executor = deterministic::Runner::default();
2980 test_encode_single_eof(executor);
2981 }
2982
2983 #[test]
2984 fn test_tokio_encode_single_eof() {
2985 let runner = tokio::Runner::default();
2986 test_encode_single_eof(runner);
2987 }
2988
2989 fn test_with_scope_nested_inherits<R: Runner>(runner: R)
2990 where
2991 R::Context: Metrics,
2992 {
2993 runner.start(|context| async move {
2994 let scoped = context.with_label("engine").with_scope();
2995
2996 let nested = scoped.with_scope();
2998 let counter = Counter::<u64>::default();
2999 nested.register("votes", "vote count", counter.clone());
3000 counter.inc();
3001
3002 let buffer = context.encode();
3003 assert!(
3004 buffer.contains("engine_votes_total 1"),
3005 "nested scope should inherit parent scope: {buffer}"
3006 );
3007
3008 drop(nested);
3011 let buffer = context.encode();
3012 assert!(
3013 buffer.contains("engine_votes_total 1"),
3014 "metrics should survive as long as any scope clone exists: {buffer}"
3015 );
3016
3017 drop(scoped);
3019 let buffer = context.encode();
3020 assert!(
3021 !buffer.contains("engine_votes"),
3022 "metrics should be removed when all scope clones are dropped: {buffer}"
3023 );
3024 });
3025 }
3026
3027 #[test]
3028 fn test_deterministic_with_scope_nested_inherits() {
3029 let executor = deterministic::Runner::default();
3030 test_with_scope_nested_inherits(executor);
3031 }
3032
3033 #[test]
3034 fn test_tokio_with_scope_nested_inherits() {
3035 let runner = tokio::Runner::default();
3036 test_with_scope_nested_inherits(runner);
3037 }
3038
3039 #[test]
3040 #[should_panic(expected = "duplicate metric:")]
3041 fn test_deterministic_reregister_after_scope_drop() {
3042 let executor = deterministic::Runner::default();
3043 executor.start(|context| async move {
3044 let scoped = context
3045 .with_label("engine")
3046 .with_attribute("epoch", 1)
3047 .with_scope();
3048 let c1 = Counter::<u64>::default();
3049 scoped.register("votes", "vote count", c1);
3050 drop(scoped);
3051
3052 let scoped2 = context
3054 .with_label("engine")
3055 .with_attribute("epoch", 1)
3056 .with_scope();
3057 let c2 = Counter::<u64>::default();
3058 scoped2.register("votes", "vote count", c2);
3059 });
3060 }
3061
3062 fn test_with_scope_family_with_attributes<R: Runner>(runner: R)
3063 where
3064 R::Context: Metrics,
3065 {
3066 #[derive(Clone, Debug, Hash, PartialEq, Eq)]
3067 struct Peer {
3068 name: String,
3069 }
3070 impl EncodeLabelSet for Peer {
3071 fn encode(
3072 &self,
3073 encoder: &mut prometheus_client::encoding::LabelSetEncoder<'_>,
3074 ) -> Result<(), std::fmt::Error> {
3075 let mut label = encoder.encode_label();
3076 let mut key = label.encode_label_key()?;
3077 EncodeLabelKey::encode(&"peer", &mut key)?;
3078 let mut value = key.encode_label_value()?;
3079 EncodeLabelValue::encode(&self.name.as_str(), &mut value)?;
3080 value.finish()
3081 }
3082 }
3083
3084 runner.start(|context| async move {
3085 let scoped = context
3086 .with_label("batcher")
3087 .with_attribute("epoch", 1)
3088 .with_scope();
3089
3090 let family: Family<Peer, Counter> = Family::default();
3091 scoped.register("votes", "votes per peer", family.clone());
3092 family
3093 .get_or_create(&Peer {
3094 name: "alice".into(),
3095 })
3096 .inc();
3097 family.get_or_create(&Peer { name: "bob".into() }).inc();
3098
3099 let buffer = context.encode();
3100 assert!(
3101 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"alice\"} 1"),
3102 "family with attributes should combine labels: {buffer}"
3103 );
3104 assert!(
3105 buffer.contains("batcher_votes_total{epoch=\"1\",peer=\"bob\"} 1"),
3106 "family with attributes should combine labels: {buffer}"
3107 );
3108
3109 drop(scoped);
3110 let buffer = context.encode();
3111 assert!(
3112 !buffer.contains("batcher_votes"),
3113 "family metrics should be removed: {buffer}"
3114 );
3115 });
3116 }
3117
3118 #[test]
3119 fn test_deterministic_with_scope_family_with_attributes() {
3120 let executor = deterministic::Runner::default();
3121 test_with_scope_family_with_attributes(executor);
3122 }
3123
3124 #[test]
3125 fn test_tokio_with_scope_family_with_attributes() {
3126 let runner = tokio::Runner::default();
3127 test_with_scope_family_with_attributes(runner);
3128 }
3129
3130 #[test]
3131 fn test_deterministic_future() {
3132 let runner = deterministic::Runner::default();
3133 test_error_future(runner);
3134 }
3135
3136 #[test]
3137 fn test_deterministic_clock_sleep() {
3138 let executor = deterministic::Runner::default();
3139 test_clock_sleep(executor);
3140 }
3141
3142 #[test]
3143 fn test_deterministic_clock_sleep_until() {
3144 let executor = deterministic::Runner::default();
3145 test_clock_sleep_until(executor);
3146 }
3147
3148 #[test]
3149 fn test_deterministic_clock_timeout() {
3150 let executor = deterministic::Runner::default();
3151 test_clock_timeout(executor);
3152 }
3153
3154 #[test]
3155 fn test_deterministic_root_finishes() {
3156 let executor = deterministic::Runner::default();
3157 test_root_finishes(executor);
3158 }
3159
3160 #[test]
3161 fn test_deterministic_spawn_after_abort() {
3162 let executor = deterministic::Runner::default();
3163 test_spawn_after_abort(executor);
3164 }
3165
3166 #[test]
3167 fn test_deterministic_spawn_abort() {
3168 let executor = deterministic::Runner::default();
3169 test_spawn_abort(executor, false, false);
3170 }
3171
3172 #[test]
3173 #[should_panic(expected = "blah")]
3174 fn test_deterministic_panic_aborts_root() {
3175 let runner = deterministic::Runner::default();
3176 test_panic_aborts_root(runner);
3177 }
3178
3179 #[test]
3180 #[should_panic(expected = "blah")]
3181 fn test_deterministic_panic_aborts_root_caught() {
3182 let cfg = deterministic::Config::default().with_catch_panics(true);
3183 let runner = deterministic::Runner::new(cfg);
3184 test_panic_aborts_root(runner);
3185 }
3186
3187 #[test]
3188 #[should_panic(expected = "blah")]
3189 fn test_deterministic_panic_aborts_spawn() {
3190 let executor = deterministic::Runner::default();
3191 test_panic_aborts_spawn(executor);
3192 }
3193
3194 #[test]
3195 fn test_deterministic_panic_aborts_spawn_caught() {
3196 let cfg = deterministic::Config::default().with_catch_panics(true);
3197 let executor = deterministic::Runner::new(cfg);
3198 test_panic_aborts_spawn_caught(executor);
3199 }
3200
3201 #[test]
3202 #[should_panic(expected = "boom")]
3203 fn test_deterministic_multiple_panics() {
3204 let executor = deterministic::Runner::default();
3205 test_multiple_panics(executor);
3206 }
3207
3208 #[test]
3209 fn test_deterministic_multiple_panics_caught() {
3210 let cfg = deterministic::Config::default().with_catch_panics(true);
3211 let executor = deterministic::Runner::new(cfg);
3212 test_multiple_panics_caught(executor);
3213 }
3214
3215 #[test]
3216 fn test_deterministic_select() {
3217 let executor = deterministic::Runner::default();
3218 test_select(executor);
3219 }
3220
3221 #[test]
3222 fn test_deterministic_select_loop() {
3223 let executor = deterministic::Runner::default();
3224 test_select_loop(executor);
3225 }
3226
3227 #[test]
3228 fn test_deterministic_storage_operations() {
3229 let executor = deterministic::Runner::default();
3230 test_storage_operations(executor);
3231 }
3232
3233 #[test]
3234 fn test_deterministic_blob_read_write() {
3235 let executor = deterministic::Runner::default();
3236 test_blob_read_write(executor);
3237 }
3238
3239 #[test]
3240 fn test_deterministic_blob_resize() {
3241 let executor = deterministic::Runner::default();
3242 test_blob_resize(executor);
3243 }
3244
3245 #[test]
3246 fn test_deterministic_many_partition_read_write() {
3247 let executor = deterministic::Runner::default();
3248 test_many_partition_read_write(executor);
3249 }
3250
3251 #[test]
3252 fn test_deterministic_blob_read_past_length() {
3253 let executor = deterministic::Runner::default();
3254 test_blob_read_past_length(executor);
3255 }
3256
3257 #[test]
3258 fn test_deterministic_blob_clone_and_concurrent_read() {
3259 let executor = deterministic::Runner::default();
3261 test_blob_clone_and_concurrent_read(executor);
3262 }
3263
3264 #[test]
3265 fn test_deterministic_shutdown() {
3266 let executor = deterministic::Runner::default();
3267 test_shutdown(executor);
3268 }
3269
3270 #[test]
3271 fn test_deterministic_shutdown_multiple_signals() {
3272 let executor = deterministic::Runner::default();
3273 test_shutdown_multiple_signals(executor);
3274 }
3275
3276 #[test]
3277 fn test_deterministic_shutdown_timeout() {
3278 let executor = deterministic::Runner::default();
3279 test_shutdown_timeout(executor);
3280 }
3281
3282 #[test]
3283 fn test_deterministic_shutdown_multiple_stop_calls() {
3284 let executor = deterministic::Runner::default();
3285 test_shutdown_multiple_stop_calls(executor);
3286 }
3287
3288 #[test]
3289 fn test_deterministic_unfulfilled_shutdown() {
3290 let executor = deterministic::Runner::default();
3291 test_unfulfilled_shutdown(executor);
3292 }
3293
3294 #[test]
3295 fn test_deterministic_spawn_dedicated() {
3296 let executor = deterministic::Runner::default();
3297 test_spawn_dedicated(executor);
3298 }
3299
3300 #[test]
3301 fn test_deterministic_spawn() {
3302 let runner = deterministic::Runner::default();
3303 test_spawn(runner);
3304 }
3305
3306 #[test]
3307 fn test_deterministic_spawn_abort_on_parent_abort() {
3308 let runner = deterministic::Runner::default();
3309 test_spawn_abort_on_parent_abort(runner);
3310 }
3311
3312 #[test]
3313 fn test_deterministic_spawn_abort_on_parent_completion() {
3314 let runner = deterministic::Runner::default();
3315 test_spawn_abort_on_parent_completion(runner);
3316 }
3317
3318 #[test]
3319 fn test_deterministic_spawn_cascading_abort() {
3320 let runner = deterministic::Runner::default();
3321 test_spawn_cascading_abort(runner);
3322 }
3323
3324 #[test]
3325 fn test_deterministic_child_survives_sibling_completion() {
3326 let runner = deterministic::Runner::default();
3327 test_child_survives_sibling_completion(runner);
3328 }
3329
3330 #[test]
3331 fn test_deterministic_spawn_clone_chain() {
3332 let runner = deterministic::Runner::default();
3333 test_spawn_clone_chain(runner);
3334 }
3335
3336 #[test]
3337 fn test_deterministic_spawn_sparse_clone_chain() {
3338 let runner = deterministic::Runner::default();
3339 test_spawn_sparse_clone_chain(runner);
3340 }
3341
3342 #[test]
3343 fn test_deterministic_spawn_blocking() {
3344 for dedicated in [false, true] {
3345 let executor = deterministic::Runner::default();
3346 test_spawn_blocking(executor, dedicated);
3347 }
3348 }
3349
3350 #[test]
3351 #[should_panic(expected = "blocking task panicked")]
3352 fn test_deterministic_spawn_blocking_panic() {
3353 for dedicated in [false, true] {
3354 let executor = deterministic::Runner::default();
3355 test_spawn_blocking_panic(executor, dedicated);
3356 }
3357 }
3358
3359 #[test]
3360 fn test_deterministic_spawn_blocking_panic_caught() {
3361 for dedicated in [false, true] {
3362 let cfg = deterministic::Config::default().with_catch_panics(true);
3363 let executor = deterministic::Runner::new(cfg);
3364 test_spawn_blocking_panic_caught(executor, dedicated);
3365 }
3366 }
3367
3368 #[test]
3369 fn test_deterministic_spawn_blocking_abort() {
3370 for (dedicated, blocking) in [(false, true), (true, false)] {
3371 let executor = deterministic::Runner::default();
3372 test_spawn_abort(executor, dedicated, blocking);
3373 }
3374 }
3375
3376 #[test]
3377 fn test_deterministic_circular_reference_prevents_cleanup() {
3378 let executor = deterministic::Runner::default();
3379 test_circular_reference_prevents_cleanup(executor);
3380 }
3381
3382 #[test]
3383 fn test_deterministic_late_waker() {
3384 let executor = deterministic::Runner::default();
3385 test_late_waker(executor);
3386 }
3387
3388 #[test]
3389 fn test_deterministic_metrics() {
3390 let executor = deterministic::Runner::default();
3391 test_metrics(executor);
3392 }
3393
3394 #[test_collect_traces]
3395 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
3396 let executor = deterministic::Runner::new(deterministic::Config::default());
3397 executor.start(|context| async move {
3398 context
3399 .with_label("test")
3400 .instrumented()
3401 .spawn(|context| async move {
3402 tracing::info!(field = "test field", "test log");
3403
3404 context
3405 .with_label("inner")
3406 .instrumented()
3407 .spawn(|_| async move {
3408 tracing::info!("inner log");
3409 })
3410 .await
3411 .unwrap();
3412 })
3413 .await
3414 .unwrap();
3415 });
3416
3417 let info_traces = traces.get_by_level(Level::INFO);
3418 assert_eq!(info_traces.len(), 2);
3419
3420 info_traces
3422 .expect_event_at_index(0, |event| {
3423 event.metadata.expect_content_exact("test log")?;
3424 event.metadata.expect_field_count(1)?;
3425 event.metadata.expect_field_exact("field", "test field")?;
3426 event.expect_span_count(1)?;
3427 event.expect_span_at_index(0, |span| {
3428 span.expect_content_exact("task")?;
3429 span.expect_field_count(1)?;
3430 span.expect_field_exact("name", "test")
3431 })
3432 })
3433 .unwrap();
3434
3435 info_traces
3436 .expect_event_at_index(1, |event| {
3437 event.metadata.expect_content_exact("inner log")?;
3438 event.metadata.expect_field_count(0)?;
3439 event.expect_span_count(1)?;
3440 event.expect_span_at_index(0, |span| {
3441 span.expect_content_exact("task")?;
3442 span.expect_field_count(1)?;
3443 span.expect_field_exact("name", "test_inner")
3444 })
3445 })
3446 .unwrap();
3447 }
3448
3449 #[test]
3450 fn test_deterministic_resolver() {
3451 let executor = deterministic::Runner::default();
3452 executor.start(|context| async move {
3453 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3455 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3456 context.resolver_register("example.com", Some(vec![ip1, ip2]));
3457
3458 let addrs = context.resolve("example.com").await.unwrap();
3460 assert_eq!(addrs, vec![ip1, ip2]);
3461
3462 let result = context.resolve("unknown.com").await;
3464 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3465
3466 context.resolver_register("example.com", None);
3468 let result = context.resolve("example.com").await;
3469 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3470 });
3471 }
3472
3473 #[test]
3474 fn test_tokio_error_future() {
3475 let runner = tokio::Runner::default();
3476 test_error_future(runner);
3477 }
3478
3479 #[test]
3480 fn test_tokio_clock_sleep() {
3481 let executor = tokio::Runner::default();
3482 test_clock_sleep(executor);
3483 }
3484
3485 #[test]
3486 fn test_tokio_clock_sleep_until() {
3487 let executor = tokio::Runner::default();
3488 test_clock_sleep_until(executor);
3489 }
3490
3491 #[test]
3492 fn test_tokio_clock_timeout() {
3493 let executor = tokio::Runner::default();
3494 test_clock_timeout(executor);
3495 }
3496
3497 #[test]
3498 fn test_tokio_root_finishes() {
3499 let executor = tokio::Runner::default();
3500 test_root_finishes(executor);
3501 }
3502
3503 #[test]
3504 fn test_tokio_spawn_after_abort() {
3505 let executor = tokio::Runner::default();
3506 test_spawn_after_abort(executor);
3507 }
3508
3509 #[test]
3510 fn test_tokio_spawn_abort() {
3511 let executor = tokio::Runner::default();
3512 test_spawn_abort(executor, false, false);
3513 }
3514
3515 #[test]
3516 #[should_panic(expected = "blah")]
3517 fn test_tokio_panic_aborts_root() {
3518 let executor = tokio::Runner::default();
3519 test_panic_aborts_root(executor);
3520 }
3521
3522 #[test]
3523 #[should_panic(expected = "blah")]
3524 fn test_tokio_panic_aborts_root_caught() {
3525 let cfg = tokio::Config::default().with_catch_panics(true);
3526 let executor = tokio::Runner::new(cfg);
3527 test_panic_aborts_root(executor);
3528 }
3529
3530 #[test]
3531 #[should_panic(expected = "blah")]
3532 fn test_tokio_panic_aborts_spawn() {
3533 let executor = tokio::Runner::default();
3534 test_panic_aborts_spawn(executor);
3535 }
3536
3537 #[test]
3538 fn test_tokio_panic_aborts_spawn_caught() {
3539 let cfg = tokio::Config::default().with_catch_panics(true);
3540 let executor = tokio::Runner::new(cfg);
3541 test_panic_aborts_spawn_caught(executor);
3542 }
3543
3544 #[test]
3545 #[should_panic(expected = "boom")]
3546 fn test_tokio_multiple_panics() {
3547 let executor = tokio::Runner::default();
3548 test_multiple_panics(executor);
3549 }
3550
3551 #[test]
3552 fn test_tokio_multiple_panics_caught() {
3553 let cfg = tokio::Config::default().with_catch_panics(true);
3554 let executor = tokio::Runner::new(cfg);
3555 test_multiple_panics_caught(executor);
3556 }
3557
3558 #[test]
3559 fn test_tokio_select() {
3560 let executor = tokio::Runner::default();
3561 test_select(executor);
3562 }
3563
3564 #[test]
3565 fn test_tokio_select_loop() {
3566 let executor = tokio::Runner::default();
3567 test_select_loop(executor);
3568 }
3569
3570 #[test]
3571 fn test_tokio_storage_operations() {
3572 let executor = tokio::Runner::default();
3573 test_storage_operations(executor);
3574 }
3575
3576 #[test]
3577 fn test_tokio_blob_read_write() {
3578 let executor = tokio::Runner::default();
3579 test_blob_read_write(executor);
3580 }
3581
3582 #[test]
3583 fn test_tokio_blob_resize() {
3584 let executor = tokio::Runner::default();
3585 test_blob_resize(executor);
3586 }
3587
3588 #[test]
3589 fn test_tokio_many_partition_read_write() {
3590 let executor = tokio::Runner::default();
3591 test_many_partition_read_write(executor);
3592 }
3593
3594 #[test]
3595 fn test_tokio_blob_read_past_length() {
3596 let executor = tokio::Runner::default();
3597 test_blob_read_past_length(executor);
3598 }
3599
3600 #[test]
3601 fn test_tokio_blob_clone_and_concurrent_read() {
3602 let executor = tokio::Runner::default();
3604 test_blob_clone_and_concurrent_read(executor);
3605 }
3606
3607 #[test]
3608 fn test_tokio_shutdown() {
3609 let executor = tokio::Runner::default();
3610 test_shutdown(executor);
3611 }
3612
3613 #[test]
3614 fn test_tokio_shutdown_multiple_signals() {
3615 let executor = tokio::Runner::default();
3616 test_shutdown_multiple_signals(executor);
3617 }
3618
3619 #[test]
3620 fn test_tokio_shutdown_timeout() {
3621 let executor = tokio::Runner::default();
3622 test_shutdown_timeout(executor);
3623 }
3624
3625 #[test]
3626 fn test_tokio_shutdown_multiple_stop_calls() {
3627 let executor = tokio::Runner::default();
3628 test_shutdown_multiple_stop_calls(executor);
3629 }
3630
3631 #[test]
3632 fn test_tokio_unfulfilled_shutdown() {
3633 let executor = tokio::Runner::default();
3634 test_unfulfilled_shutdown(executor);
3635 }
3636
3637 #[test]
3638 fn test_tokio_spawn_dedicated() {
3639 let executor = tokio::Runner::default();
3640 test_spawn_dedicated(executor);
3641 }
3642
3643 #[test]
3644 fn test_tokio_spawn() {
3645 let runner = tokio::Runner::default();
3646 test_spawn(runner);
3647 }
3648
3649 #[test]
3650 fn test_tokio_spawn_abort_on_parent_abort() {
3651 let runner = tokio::Runner::default();
3652 test_spawn_abort_on_parent_abort(runner);
3653 }
3654
3655 #[test]
3656 fn test_tokio_spawn_abort_on_parent_completion() {
3657 let runner = tokio::Runner::default();
3658 test_spawn_abort_on_parent_completion(runner);
3659 }
3660
3661 #[test]
3662 fn test_tokio_spawn_cascading_abort() {
3663 let runner = tokio::Runner::default();
3664 test_spawn_cascading_abort(runner);
3665 }
3666
3667 #[test]
3668 fn test_tokio_child_survives_sibling_completion() {
3669 let runner = tokio::Runner::default();
3670 test_child_survives_sibling_completion(runner);
3671 }
3672
3673 #[test]
3674 fn test_tokio_spawn_clone_chain() {
3675 let runner = tokio::Runner::default();
3676 test_spawn_clone_chain(runner);
3677 }
3678
3679 #[test]
3680 fn test_tokio_spawn_sparse_clone_chain() {
3681 let runner = tokio::Runner::default();
3682 test_spawn_sparse_clone_chain(runner);
3683 }
3684
3685 #[test]
3686 fn test_tokio_spawn_blocking() {
3687 for dedicated in [false, true] {
3688 let executor = tokio::Runner::default();
3689 test_spawn_blocking(executor, dedicated);
3690 }
3691 }
3692
3693 #[test]
3694 #[should_panic(expected = "blocking task panicked")]
3695 fn test_tokio_spawn_blocking_panic() {
3696 for dedicated in [false, true] {
3697 let executor = tokio::Runner::default();
3698 test_spawn_blocking_panic(executor, dedicated);
3699 }
3700 }
3701
3702 #[test]
3703 fn test_tokio_spawn_blocking_panic_caught() {
3704 for dedicated in [false, true] {
3705 let cfg = tokio::Config::default().with_catch_panics(true);
3706 let executor = tokio::Runner::new(cfg);
3707 test_spawn_blocking_panic_caught(executor, dedicated);
3708 }
3709 }
3710
3711 #[test]
3712 fn test_tokio_spawn_blocking_abort() {
3713 for (dedicated, blocking) in [(false, true), (true, false)] {
3714 let executor = tokio::Runner::default();
3715 test_spawn_abort(executor, dedicated, blocking);
3716 }
3717 }
3718
3719 #[test]
3720 fn test_tokio_circular_reference_prevents_cleanup() {
3721 let executor = tokio::Runner::default();
3722 test_circular_reference_prevents_cleanup(executor);
3723 }
3724
3725 #[test]
3726 fn test_tokio_late_waker() {
3727 let executor = tokio::Runner::default();
3728 test_late_waker(executor);
3729 }
3730
3731 #[test]
3732 fn test_tokio_metrics() {
3733 let executor = tokio::Runner::default();
3734 test_metrics(executor);
3735 }
3736
3737 #[test]
3738 fn test_tokio_process_rss_metric() {
3739 let executor = tokio::Runner::default();
3740 executor.start(|context| async move {
3741 loop {
3742 let metrics = context.encode();
3744 if !metrics.contains("runtime_process_rss") {
3745 context.sleep(Duration::from_millis(100)).await;
3746 continue;
3747 }
3748
3749 for line in metrics.lines() {
3751 if line.starts_with("runtime_process_rss")
3752 && !line.starts_with("runtime_process_rss{")
3753 {
3754 let parts: Vec<&str> = line.split_whitespace().collect();
3755 if parts.len() >= 2 {
3756 let rss_value: i64 =
3757 parts[1].parse().expect("Failed to parse RSS value");
3758 if rss_value > 0 {
3759 return;
3760 }
3761 }
3762 }
3763 }
3764 }
3765 });
3766 }
3767
3768 #[test]
3769 fn test_tokio_telemetry() {
3770 let executor = tokio::Runner::default();
3771 executor.start(|context| async move {
3772 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3774
3775 tokio::telemetry::init(
3777 context.with_label("metrics"),
3778 tokio::telemetry::Logging {
3779 level: Level::INFO,
3780 json: false,
3781 },
3782 Some(address),
3783 None,
3784 );
3785
3786 let counter: Counter<u64> = Counter::default();
3788 context.register("test_counter", "Test counter", counter.clone());
3789 counter.inc();
3790
3791 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3793 let mut line = Vec::new();
3794 loop {
3795 let received = stream.recv(1).await?;
3796 let byte = received.coalesce().as_ref()[0];
3797 if byte == b'\n' {
3798 if line.last() == Some(&b'\r') {
3799 line.pop(); }
3801 break;
3802 }
3803 line.push(byte);
3804 }
3805 String::from_utf8(line).map_err(|_| Error::ReadFailed)
3806 }
3807
3808 async fn read_headers<St: Stream>(
3809 stream: &mut St,
3810 ) -> Result<HashMap<String, String>, Error> {
3811 let mut headers = HashMap::new();
3812 loop {
3813 let line = read_line(stream).await?;
3814 if line.is_empty() {
3815 break;
3816 }
3817 let parts: Vec<&str> = line.splitn(2, ": ").collect();
3818 if parts.len() == 2 {
3819 headers.insert(parts[0].to_string(), parts[1].to_string());
3820 }
3821 }
3822 Ok(headers)
3823 }
3824
3825 async fn read_body<St: Stream>(
3826 stream: &mut St,
3827 content_length: usize,
3828 ) -> Result<String, Error> {
3829 let received = stream.recv(content_length).await?;
3830 String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3831 }
3832
3833 let client_handle = context
3835 .with_label("client")
3836 .spawn(move |context| async move {
3837 let (mut sink, mut stream) = loop {
3838 match context.dial(address).await {
3839 Ok((sink, stream)) => break (sink, stream),
3840 Err(e) => {
3841 error!(err =?e, "failed to connect");
3843 context.sleep(Duration::from_millis(10)).await;
3844 }
3845 }
3846 };
3847
3848 let request = format!(
3850 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3851 );
3852 sink.send(Bytes::from(request)).await.unwrap();
3853
3854 let status_line = read_line(&mut stream).await.unwrap();
3856 assert_eq!(status_line, "HTTP/1.1 200 OK");
3857
3858 let headers = read_headers(&mut stream).await.unwrap();
3860 println!("Headers: {headers:?}");
3861 let content_length = headers
3862 .get("content-length")
3863 .unwrap()
3864 .parse::<usize>()
3865 .unwrap();
3866
3867 let body = read_body(&mut stream, content_length).await.unwrap();
3869 assert!(body.contains("test_counter_total 1"));
3870 });
3871
3872 client_handle.await.unwrap();
3874 });
3875 }
3876
3877 #[test]
3878 fn test_tokio_resolver() {
3879 let executor = tokio::Runner::default();
3880 executor.start(|context| async move {
3881 let addrs = context.resolve("localhost").await.unwrap();
3882 assert!(!addrs.is_empty());
3883 for addr in addrs {
3884 assert!(
3885 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3886 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3887 );
3888 }
3889 });
3890 }
3891
3892 #[test]
3893 fn test_create_thread_pool_tokio() {
3894 let executor = tokio::Runner::default();
3895 executor.start(|context| async move {
3896 let pool = context
3898 .with_label("pool")
3899 .create_thread_pool(NZUsize!(4))
3900 .unwrap();
3901
3902 let v: Vec<_> = (0..10000).collect();
3904
3905 pool.install(|| {
3907 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3908 });
3909 });
3910 }
3911
3912 #[test]
3913 fn test_create_thread_pool_deterministic() {
3914 let executor = deterministic::Runner::default();
3915 executor.start(|context| async move {
3916 let pool = context
3918 .with_label("pool")
3919 .create_thread_pool(NZUsize!(4))
3920 .unwrap();
3921
3922 let v: Vec<_> = (0..10000).collect();
3924
3925 pool.install(|| {
3927 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3928 });
3929 });
3930 }
3931
3932 fn test_buffer_pooler<R: Runner>(
3933 runner: R,
3934 expected_network_max_per_class: usize,
3935 expected_storage_max_per_class: usize,
3936 ) where
3937 R::Context: BufferPooler,
3938 {
3939 runner.start(|context| async move {
3940 let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
3942 assert!(net_buf.capacity() >= 1024);
3943
3944 let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
3946 assert!(storage_buf.capacity() >= 4096);
3947
3948 assert_eq!(
3950 context.network_buffer_pool().config().max_per_class.get(),
3951 expected_network_max_per_class
3952 );
3953 assert_eq!(
3954 context.storage_buffer_pool().config().max_per_class.get(),
3955 expected_storage_max_per_class
3956 );
3957 });
3958 }
3959
3960 #[test]
3961 fn test_deterministic_buffer_pooler() {
3962 test_buffer_pooler(deterministic::Runner::default(), 4096, 32);
3963
3964 let runner = deterministic::Runner::new(
3965 deterministic::Config::default()
3966 .with_network_buffer_pool_config(
3967 BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
3968 )
3969 .with_storage_buffer_pool_config(
3970 BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
3971 ),
3972 );
3973 test_buffer_pooler(runner, 64, 8);
3974 }
3975
3976 #[test]
3977 fn test_tokio_buffer_pooler() {
3978 test_buffer_pooler(tokio::Runner::default(), 4096, 32);
3979
3980 let runner = tokio::Runner::new(
3981 tokio::Config::default()
3982 .with_network_buffer_pool_config(
3983 BufferPoolConfig::for_network().with_max_per_class(NZUsize!(64)),
3984 )
3985 .with_storage_buffer_pool_config(
3986 BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(8)),
3987 ),
3988 );
3989 test_buffer_pooler(runner, 64, 8);
3990 }
3991}