1#![doc(
20 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
21 html_favicon_url = "https://commonware.xyz/favicon.ico"
22)]
23
24use commonware_macros::stability_scope;
25
26#[macro_use]
27mod macros;
28
29mod network;
30mod process;
31mod storage;
32
33stability_scope!(ALPHA {
34 pub mod deterministic;
35 pub mod mocks;
36});
37stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
38 pub mod benchmarks;
39});
40stability_scope!(ALPHA, cfg(any(feature = "iouring-storage", feature = "iouring-network")) {
41 mod iouring;
42});
43stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
44 pub mod tokio;
45});
46stability_scope!(BETA {
47 use commonware_macros::select;
48 use commonware_parallel::{Rayon, ThreadPool};
49 use iobuf::PoolError;
50 use prometheus_client::registry::Metric;
51 use rayon::ThreadPoolBuildError;
52 use std::{
53 future::Future,
54 io::Error as IoError,
55 net::SocketAddr,
56 num::NonZeroUsize,
57 time::{Duration, SystemTime},
58 };
59 use thiserror::Error;
60
61 pub(crate) const METRICS_PREFIX: &str = "runtime";
63
64 pub use bytes::{Buf, BufMut};
66 pub use governor::Quota;
68
69 pub mod iobuf;
70 pub use iobuf::{BufferPool, BufferPoolConfig, IoBuf, IoBufMut, IoBufs, IoBufsMut};
71
72 pub mod utils;
73 pub use utils::*;
74
75 pub mod telemetry;
76
77 pub const DEFAULT_BLOB_VERSION: u16 = 0;
79
80 #[derive(Error, Debug)]
82 pub enum Error {
83 #[error("exited")]
84 Exited,
85 #[error("closed")]
86 Closed,
87 #[error("timeout")]
88 Timeout,
89 #[error("bind failed")]
90 BindFailed,
91 #[error("connection failed")]
92 ConnectionFailed,
93 #[error("write failed")]
94 WriteFailed,
95 #[error("read failed")]
96 ReadFailed,
97 #[error("send failed")]
98 SendFailed,
99 #[error("recv failed")]
100 RecvFailed,
101 #[error("dns resolution failed: {0}")]
102 ResolveFailed(String),
103 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
104 PartitionNameInvalid(String),
105 #[error("partition creation failed: {0}")]
106 PartitionCreationFailed(String),
107 #[error("partition missing: {0}")]
108 PartitionMissing(String),
109 #[error("partition corrupt: {0}")]
110 PartitionCorrupt(String),
111 #[error("blob open failed: {0}/{1} error: {2}")]
112 BlobOpenFailed(String, String, IoError),
113 #[error("blob missing: {0}/{1}")]
114 BlobMissing(String, String),
115 #[error("blob resize failed: {0}/{1} error: {2}")]
116 BlobResizeFailed(String, String, IoError),
117 #[error("blob sync failed: {0}/{1} error: {2}")]
118 BlobSyncFailed(String, String, IoError),
119 #[error("blob insufficient length")]
120 BlobInsufficientLength,
121 #[error("blob corrupt: {0}/{1} reason: {2}")]
122 BlobCorrupt(String, String, String),
123 #[error("blob version mismatch: expected one of {expected:?}, found {found}")]
124 BlobVersionMismatch {
125 expected: std::ops::RangeInclusive<u16>,
126 found: u16,
127 },
128 #[error("invalid or missing checksum")]
129 InvalidChecksum,
130 #[error("offset overflow")]
131 OffsetOverflow,
132 #[error("immutable blob")]
133 ImmutableBlob,
134 #[error("io error: {0}")]
135 Io(#[from] IoError),
136 #[error("buffer pool: {0}")]
137 Pool(#[from] PoolError),
138 }
139
140 pub trait Runner {
143 type Context;
149
150 fn start<F, Fut>(self, f: F) -> Fut::Output
156 where
157 F: FnOnce(Self::Context) -> Fut,
158 Fut: Future;
159 }
160
161 pub trait Spawner: Clone + Send + Sync + 'static {
163 fn shared(self, blocking: bool) -> Self;
172
173 fn dedicated(self) -> Self;
180
181 fn instrumented(self) -> Self;
183
184 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
218 where
219 F: FnOnce(Self) -> Fut + Send + 'static,
220 Fut: Future<Output = T> + Send + 'static,
221 T: Send + 'static;
222
223 fn stop(
243 self,
244 value: i32,
245 timeout: Option<Duration>,
246 ) -> impl Future<Output = Result<(), Error>> + Send;
247
248 fn stopped(&self) -> signal::Signal;
255 }
256
257 pub trait ThreadPooler: Spawner + Metrics {
260 fn create_thread_pool(
269 &self,
270 concurrency: NonZeroUsize,
271 ) -> Result<ThreadPool, ThreadPoolBuildError>;
272
273 fn create_strategy(
282 &self,
283 concurrency: NonZeroUsize,
284 ) -> Result<Rayon, ThreadPoolBuildError> {
285 self.create_thread_pool(concurrency).map(Rayon::with_pool)
286 }
287 }
288
289 pub trait Metrics: Clone + Send + Sync + 'static {
291 fn label(&self) -> String;
293
294 fn with_label(&self, label: &str) -> Self;
302
303 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self;
390
391 fn scoped_label(&self, label: &str) -> String {
395 let label = if self.label().is_empty() {
396 label.to_string()
397 } else {
398 format!("{}_{}", self.label(), label)
399 };
400 assert!(
401 !label.starts_with(METRICS_PREFIX),
402 "using runtime label is not allowed"
403 );
404 label
405 }
406
407 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
413
414 fn encode(&self) -> String;
421 }
422
423 pub type RateLimiter<C> = governor::RateLimiter<
428 governor::state::NotKeyed,
429 governor::state::InMemoryState,
430 C,
431 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
432 >;
433
434 pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
441 K,
442 governor::state::keyed::HashMapStateStore<K>,
443 C,
444 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
445 >;
446
447 pub trait Clock:
453 governor::clock::Clock<Instant = SystemTime>
454 + governor::clock::ReasonablyRealtime
455 + Clone
456 + Send
457 + Sync
458 + 'static
459 {
460 fn current(&self) -> SystemTime;
462
463 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
465
466 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
468
469 fn timeout<F, T>(
490 &self,
491 duration: Duration,
492 future: F,
493 ) -> impl Future<Output = Result<T, Error>> + Send + '_
494 where
495 F: Future<Output = T> + Send + 'static,
496 T: Send + 'static,
497 {
498 async move {
499 select! {
500 result = future => Ok(result),
501 _ = self.sleep(duration) => Err(Error::Timeout),
502 }
503 }
504 }
505 }
506
507 pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
509
510 pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
512
513 pub type ListenerOf<N> = <N as crate::Network>::Listener;
515
516 pub trait Network: Clone + Send + Sync + 'static {
519 type Listener: Listener;
523
524 fn bind(
526 &self,
527 socket: SocketAddr,
528 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
529
530 fn dial(
532 &self,
533 socket: SocketAddr,
534 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
535 }
536
537 pub trait Resolver: Clone + Send + Sync + 'static {
539 fn resolve(
543 &self,
544 host: &str,
545 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
546 }
547
548 pub trait Listener: Sync + Send + 'static {
551 type Sink: Sink;
554 type Stream: Stream;
557
558 fn accept(
560 &mut self,
561 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
562
563 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
565 }
566
567 pub trait Sink: Sync + Send + 'static {
570 fn send(
576 &mut self,
577 buf: impl Into<IoBufs> + Send,
578 ) -> impl Future<Output = Result<(), Error>> + Send;
579 }
580
581 pub trait Stream: Sync + Send + 'static {
584 fn recv(&mut self, len: u64) -> impl Future<Output = Result<IoBufs, Error>> + Send;
592
593 fn peek(&self, max_len: u64) -> &[u8];
601 }
602
603 pub trait Storage: Clone + Send + Sync + 'static {
616 type Blob: Blob;
618
619 fn open(
622 &self,
623 partition: &str,
624 name: &[u8],
625 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
626 async move {
627 let (blob, size, _) = self
628 .open_versioned(partition, name, DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION)
629 .await?;
630 Ok((blob, size))
631 }
632 }
633
634 fn open_versioned(
651 &self,
652 partition: &str,
653 name: &[u8],
654 versions: std::ops::RangeInclusive<u16>,
655 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;
656
657 fn remove(
663 &self,
664 partition: &str,
665 name: Option<&[u8]>,
666 ) -> impl Future<Output = Result<(), Error>> + Send;
667
668 fn scan(&self, partition: &str)
670 -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
671 }
672
673 #[allow(clippy::len_without_is_empty)]
688 pub trait Blob: Clone + Send + Sync + 'static {
689 fn read_at(
700 &self,
701 offset: u64,
702 buf: impl Into<IoBufsMut> + Send,
703 ) -> impl Future<Output = Result<IoBufsMut, Error>> + Send;
704
705 fn write_at(
707 &self,
708 offset: u64,
709 buf: impl Into<IoBufs> + Send,
710 ) -> impl Future<Output = Result<(), Error>> + Send;
711
712 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
717
718 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
720 }
721
722 pub trait BufferPooler: Clone + Send + Sync + 'static {
724 fn network_buffer_pool(&self) -> &BufferPool;
726
727 fn storage_buffer_pool(&self) -> &BufferPool;
729 }
730});
731stability_scope!(ALPHA, cfg(feature = "external") {
732 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
734 fn pace<'a, F, T>(
754 &'a self,
755 latency: Duration,
756 future: F,
757 ) -> impl Future<Output = T> + Send + 'a
758 where
759 F: Future<Output = T> + Send + 'a,
760 T: Send + 'a;
761 }
762
763 pub trait FutureExt: Future + Send + Sized {
768 fn pace<'a, E>(
770 self,
771 pacer: &'a E,
772 latency: Duration,
773 ) -> impl Future<Output = Self::Output> + Send + 'a
774 where
775 E: Pacer + 'a,
776 Self: Send + 'a,
777 Self::Output: Send + 'a,
778 {
779 pacer.pace(latency, self)
780 }
781 }
782
783 impl<F> FutureExt for F where F: Future + Send {}
784});
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::telemetry::traces::collector::TraceStorage;
790 use bytes::Bytes;
791 use commonware_macros::{select, test_collect_traces};
792 use commonware_utils::{
793 channel::{mpsc, oneshot},
794 NZUsize,
795 };
796 use futures::{
797 future::{pending, ready},
798 join, pin_mut, FutureExt,
799 };
800 use prometheus_client::{
801 encoding::EncodeLabelSet,
802 metrics::{counter::Counter, family::Family},
803 };
804 use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
805 use std::{
806 collections::HashMap,
807 net::{IpAddr, Ipv4Addr, Ipv6Addr},
808 pin::Pin,
809 str::FromStr,
810 sync::{
811 atomic::{AtomicU32, Ordering},
812 Arc, Mutex,
813 },
814 task::{Context as TContext, Poll, Waker},
815 };
816 use tracing::{error, Level};
817 use utils::reschedule;
818
819 fn test_error_future<R: Runner>(runner: R) {
820 async fn error_future() -> Result<&'static str, &'static str> {
821 Err("An error occurred")
822 }
823 let result = runner.start(|_| error_future());
824 assert_eq!(result, Err("An error occurred"));
825 }
826
827 fn test_clock_sleep<R: Runner>(runner: R)
828 where
829 R::Context: Spawner + Clock,
830 {
831 runner.start(|context| async move {
832 let start = context.current();
834 let sleep_duration = Duration::from_millis(10);
835 context.sleep(sleep_duration).await;
836
837 let end = context.current();
839 assert!(end.duration_since(start).unwrap() >= sleep_duration);
840 });
841 }
842
843 fn test_clock_sleep_until<R: Runner>(runner: R)
844 where
845 R::Context: Spawner + Clock + Metrics,
846 {
847 runner.start(|context| async move {
848 let now = context.current();
850 context.sleep_until(now + Duration::from_millis(100)).await;
851
852 let elapsed = now.elapsed().unwrap();
854 assert!(elapsed >= Duration::from_millis(100));
855 });
856 }
857
858 fn test_clock_timeout<R: Runner>(runner: R)
859 where
860 R::Context: Spawner + Clock,
861 {
862 runner.start(|context| async move {
863 let result = context
865 .timeout(Duration::from_millis(100), async { "success" })
866 .await;
867 assert_eq!(result.unwrap(), "success");
868
869 let result = context
871 .timeout(Duration::from_millis(50), pending::<()>())
872 .await;
873 assert!(matches!(result, Err(Error::Timeout)));
874
875 let result = context
877 .timeout(
878 Duration::from_millis(100),
879 context.sleep(Duration::from_millis(50)),
880 )
881 .await;
882 assert!(result.is_ok());
883 });
884 }
885
886 fn test_root_finishes<R: Runner>(runner: R)
887 where
888 R::Context: Spawner,
889 {
890 runner.start(|context| async move {
891 context.spawn(|_| async move {
892 loop {
893 reschedule().await;
894 }
895 });
896 });
897 }
898
899 fn test_spawn_after_abort<R>(runner: R)
900 where
901 R: Runner,
902 R::Context: Spawner + Clone,
903 {
904 runner.start(|context| async move {
905 let child = context.clone();
907
908 let parent_handle = context.spawn(move |_| async move {
910 pending::<()>().await;
911 });
912 parent_handle.abort();
913
914 let child_handle = child.spawn(move |_| async move {
916 pending::<()>().await;
917 });
918 assert!(matches!(child_handle.await, Err(Error::Closed)));
919 });
920 }
921
922 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
923 where
924 R::Context: Spawner,
925 {
926 runner.start(|context| async move {
927 let context = if dedicated {
928 assert!(!blocking);
929 context.dedicated()
930 } else {
931 context.shared(blocking)
932 };
933
934 let handle = context.spawn(|_| async move {
935 loop {
936 reschedule().await;
937 }
938 });
939 handle.abort();
940 assert!(matches!(handle.await, Err(Error::Closed)));
941 });
942 }
943
944 fn test_panic_aborts_root<R: Runner>(runner: R) {
945 let result: Result<(), Error> = runner.start(|_| async move {
946 panic!("blah");
947 });
948 result.unwrap_err();
949 }
950
951 fn test_panic_aborts_spawn<R: Runner>(runner: R)
952 where
953 R::Context: Spawner + Clock,
954 {
955 runner.start(|context| async move {
956 context.clone().spawn(|_| async move {
957 panic!("blah");
958 });
959
960 loop {
962 context.sleep(Duration::from_millis(100)).await;
963 }
964 });
965 }
966
967 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
968 where
969 R::Context: Spawner + Clock,
970 {
971 let result: Result<(), Error> = runner.start(|context| async move {
972 let result = context.clone().spawn(|_| async move {
973 panic!("blah");
974 });
975 result.await
976 });
977 assert!(matches!(result, Err(Error::Exited)));
978 }
979
980 fn test_multiple_panics<R: Runner>(runner: R)
981 where
982 R::Context: Spawner + Clock,
983 {
984 runner.start(|context| async move {
985 context.clone().spawn(|_| async move {
986 panic!("boom 1");
987 });
988 context.clone().spawn(|_| async move {
989 panic!("boom 2");
990 });
991 context.clone().spawn(|_| async move {
992 panic!("boom 3");
993 });
994
995 loop {
997 context.sleep(Duration::from_millis(100)).await;
998 }
999 });
1000 }
1001
1002 fn test_multiple_panics_caught<R: Runner>(runner: R)
1003 where
1004 R::Context: Spawner + Clock,
1005 {
1006 let (res1, res2, res3) = runner.start(|context| async move {
1007 let handle1 = context.clone().spawn(|_| async move {
1008 panic!("boom 1");
1009 });
1010 let handle2 = context.clone().spawn(|_| async move {
1011 panic!("boom 2");
1012 });
1013 let handle3 = context.clone().spawn(|_| async move {
1014 panic!("boom 3");
1015 });
1016
1017 join!(handle1, handle2, handle3)
1018 });
1019 assert!(matches!(res1, Err(Error::Exited)));
1020 assert!(matches!(res2, Err(Error::Exited)));
1021 assert!(matches!(res3, Err(Error::Exited)));
1022 }
1023
1024 fn test_select<R: Runner>(runner: R) {
1025 runner.start(|_| async move {
1026 let output = Mutex::new(0);
1028 select! {
1029 v1 = ready(1) => {
1030 *output.lock().unwrap() = v1;
1031 },
1032 v2 = ready(2) => {
1033 *output.lock().unwrap() = v2;
1034 },
1035 };
1036 assert_eq!(*output.lock().unwrap(), 1);
1037
1038 select! {
1040 v1 = std::future::pending::<i32>() => {
1041 *output.lock().unwrap() = v1;
1042 },
1043 v2 = ready(2) => {
1044 *output.lock().unwrap() = v2;
1045 },
1046 };
1047 assert_eq!(*output.lock().unwrap(), 2);
1048 });
1049 }
1050
1051 fn test_select_loop<R: Runner>(runner: R)
1053 where
1054 R::Context: Clock,
1055 {
1056 runner.start(|context| async move {
1057 let (sender, mut receiver) = mpsc::unbounded_channel();
1059 for _ in 0..2 {
1060 select! {
1061 v = receiver.recv() => {
1062 panic!("unexpected value: {v:?}");
1063 },
1064 _ = context.sleep(Duration::from_millis(100)) => {
1065 continue;
1066 },
1067 };
1068 }
1069
1070 sender.send(0).unwrap();
1072 sender.send(1).unwrap();
1073
1074 select! {
1076 _ = async {} => {
1077 },
1079 v = receiver.recv() => {
1080 panic!("unexpected value: {v:?}");
1081 },
1082 };
1083
1084 for i in 0..2 {
1086 select! {
1087 _ = context.sleep(Duration::from_millis(100)) => {
1088 panic!("timeout");
1089 },
1090 v = receiver.recv() => {
1091 assert_eq!(v.unwrap(), i);
1092 },
1093 };
1094 }
1095 });
1096 }
1097
1098 fn test_storage_operations<R: Runner>(runner: R)
1099 where
1100 R::Context: Storage,
1101 {
1102 runner.start(|context| async move {
1103 let partition = "test_partition";
1104 let name = b"test_blob";
1105
1106 let (blob, size) = context
1108 .open(partition, name)
1109 .await
1110 .expect("Failed to open blob");
1111 assert_eq!(size, 0, "new blob should have size 0");
1112
1113 let data = b"Hello, Storage!";
1115 blob.write_at(0, data)
1116 .await
1117 .expect("Failed to write to blob");
1118
1119 blob.sync().await.expect("Failed to sync blob");
1121
1122 let read = blob
1124 .read_at(0, IoBufMut::zeroed(data.len()))
1125 .await
1126 .expect("Failed to read from blob");
1127 assert_eq!(read.coalesce(), data);
1128
1129 blob.sync().await.expect("Failed to sync blob");
1131
1132 let blobs = context
1134 .scan(partition)
1135 .await
1136 .expect("Failed to scan partition");
1137 assert!(blobs.contains(&name.to_vec()));
1138
1139 let (blob, len) = context
1141 .open(partition, name)
1142 .await
1143 .expect("Failed to reopen blob");
1144 assert_eq!(len, data.len() as u64);
1145
1146 let read = blob
1148 .read_at(7, IoBufMut::zeroed(7))
1149 .await
1150 .expect("Failed to read data");
1151 assert_eq!(read.coalesce(), b"Storage");
1152
1153 blob.sync().await.expect("Failed to sync blob");
1155
1156 context
1158 .remove(partition, Some(name))
1159 .await
1160 .expect("Failed to remove blob");
1161
1162 let blobs = context
1164 .scan(partition)
1165 .await
1166 .expect("Failed to scan partition");
1167 assert!(!blobs.contains(&name.to_vec()));
1168
1169 context
1171 .remove(partition, None)
1172 .await
1173 .expect("Failed to remove partition");
1174
1175 let result = context.scan(partition).await;
1177 assert!(matches!(result, Err(Error::PartitionMissing(_))));
1178 });
1179 }
1180
1181 fn test_blob_read_write<R: Runner>(runner: R)
1182 where
1183 R::Context: Storage,
1184 {
1185 runner.start(|context| async move {
1186 let partition = "test_partition";
1187 let name = b"test_blob_rw";
1188
1189 let (blob, _) = context
1191 .open(partition, name)
1192 .await
1193 .expect("Failed to open blob");
1194
1195 let data1 = b"Hello";
1197 let data2 = b"World";
1198 blob.write_at(0, data1)
1199 .await
1200 .expect("Failed to write data1");
1201 blob.write_at(5, data2)
1202 .await
1203 .expect("Failed to write data2");
1204
1205 let read = blob
1207 .read_at(0, IoBufMut::zeroed(10))
1208 .await
1209 .expect("Failed to read data");
1210 let read = read.coalesce();
1211 assert_eq!(&read.as_ref()[..5], data1);
1212 assert_eq!(&read.as_ref()[5..], data2);
1213
1214 let result = blob.read_at(10, IoBufMut::zeroed(10)).await;
1216 assert!(result.is_err());
1217
1218 let data3 = b"Store";
1220 blob.write_at(5, data3)
1221 .await
1222 .expect("Failed to write data3");
1223
1224 let read = blob
1226 .read_at(0, IoBufMut::zeroed(10))
1227 .await
1228 .expect("Failed to read data");
1229 let read = read.coalesce();
1230 assert_eq!(&read.as_ref()[..5], data1);
1231 assert_eq!(&read.as_ref()[5..], data3);
1232
1233 let result = blob.read_at(10, IoBufMut::zeroed(10)).await;
1235 assert!(result.is_err());
1236 });
1237 }
1238
1239 fn test_blob_resize<R: Runner>(runner: R)
1240 where
1241 R::Context: Storage,
1242 {
1243 runner.start(|context| async move {
1244 let partition = "test_partition_resize";
1245 let name = b"test_blob_resize";
1246
1247 let (blob, _) = context
1249 .open(partition, name)
1250 .await
1251 .expect("Failed to open blob");
1252
1253 let data = b"some data";
1254 blob.write_at(0, data.to_vec())
1255 .await
1256 .expect("Failed to write");
1257 blob.sync().await.expect("Failed to sync after write");
1258
1259 let (blob, len) = context.open(partition, name).await.unwrap();
1261 assert_eq!(len, data.len() as u64);
1262
1263 let new_len = (data.len() as u64) * 2;
1265 blob.resize(new_len)
1266 .await
1267 .expect("Failed to resize to extend");
1268 blob.sync().await.expect("Failed to sync after resize");
1269
1270 let (blob, len) = context.open(partition, name).await.unwrap();
1272 assert_eq!(len, new_len);
1273
1274 let read_buf = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1276 assert_eq!(read_buf.coalesce(), data);
1277
1278 let extended_part = blob
1280 .read_at(data.len() as u64, IoBufMut::zeroed(data.len()))
1281 .await
1282 .unwrap();
1283 assert_eq!(extended_part.coalesce(), vec![0; data.len()].as_slice());
1284
1285 blob.resize(data.len() as u64).await.unwrap();
1287 blob.sync().await.unwrap();
1288
1289 let (blob, size) = context.open(partition, name).await.unwrap();
1291 assert_eq!(size, data.len() as u64);
1292
1293 let read_buf = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1295 assert_eq!(read_buf.coalesce(), data);
1296 blob.sync().await.unwrap();
1297 });
1298 }
1299
1300 fn test_many_partition_read_write<R: Runner>(runner: R)
1301 where
1302 R::Context: Storage,
1303 {
1304 runner.start(|context| async move {
1305 let partitions = ["partition1", "partition2", "partition3"];
1306 let name = b"test_blob_rw";
1307 let data1 = b"Hello";
1308 let data2 = b"World";
1309
1310 for (additional, partition) in partitions.iter().enumerate() {
1311 let (blob, _) = context
1313 .open(partition, name)
1314 .await
1315 .expect("Failed to open blob");
1316
1317 blob.write_at(0, data1)
1319 .await
1320 .expect("Failed to write data1");
1321 blob.write_at(5 + additional as u64, data2)
1322 .await
1323 .expect("Failed to write data2");
1324
1325 blob.sync().await.expect("Failed to sync blob");
1327 }
1328
1329 for (additional, partition) in partitions.iter().enumerate() {
1330 let (blob, len) = context
1332 .open(partition, name)
1333 .await
1334 .expect("Failed to open blob");
1335 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1336
1337 let read = blob
1339 .read_at(0, IoBufMut::zeroed(10 + additional))
1340 .await
1341 .expect("Failed to read data");
1342 let read = read.coalesce();
1343 assert_eq!(&read.as_ref()[..5], b"Hello");
1344 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1345 }
1346 });
1347 }
1348
1349 fn test_blob_read_past_length<R: Runner>(runner: R)
1350 where
1351 R::Context: Storage,
1352 {
1353 runner.start(|context| async move {
1354 let partition = "test_partition";
1355 let name = b"test_blob_rw";
1356
1357 let (blob, _) = context
1359 .open(partition, name)
1360 .await
1361 .expect("Failed to open blob");
1362
1363 let result = blob.read_at(0, IoBufMut::zeroed(10)).await;
1365 assert!(result.is_err());
1366
1367 let data = b"Hello, Storage!".to_vec();
1369 blob.write_at(0, data)
1370 .await
1371 .expect("Failed to write to blob");
1372
1373 let result = blob.read_at(0, IoBufMut::zeroed(20)).await;
1375 assert!(result.is_err());
1376 })
1377 }
1378
1379 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1380 where
1381 R::Context: Spawner + Storage + Metrics,
1382 {
1383 runner.start(|context| async move {
1384 let partition = "test_partition";
1385 let name = b"test_blob_rw";
1386
1387 let (blob, _) = context
1389 .open(partition, name)
1390 .await
1391 .expect("Failed to open blob");
1392
1393 let data = b"Hello, Storage!";
1395 blob.write_at(0, data)
1396 .await
1397 .expect("Failed to write to blob");
1398
1399 blob.sync().await.expect("Failed to sync blob");
1401
1402 let check1 = context.with_label("check1").spawn({
1404 let blob = blob.clone();
1405 let data_len = data.len();
1406 move |_| async move {
1407 let read = blob
1408 .read_at(0, IoBufMut::zeroed(data_len))
1409 .await
1410 .expect("Failed to read from blob");
1411 assert_eq!(read.coalesce(), data);
1412 }
1413 });
1414 let check2 = context.with_label("check2").spawn({
1415 let blob = blob.clone();
1416 let data_len = data.len();
1417 move |_| async move {
1418 let read = blob
1419 .read_at(0, IoBufMut::zeroed(data_len))
1420 .await
1421 .expect("Failed to read from blob");
1422 assert_eq!(read.coalesce(), data);
1423 }
1424 });
1425
1426 let result = join!(check1, check2);
1428 assert!(result.0.is_ok());
1429 assert!(result.1.is_ok());
1430
1431 let read = blob
1433 .read_at(0, IoBufMut::zeroed(data.len()))
1434 .await
1435 .expect("Failed to read from blob");
1436 assert_eq!(read.coalesce(), data);
1437
1438 drop(blob);
1440
1441 let buffer = context.encode();
1443 assert!(buffer.contains("open_blobs 0"));
1444 });
1445 }
1446
1447 fn test_shutdown<R: Runner>(runner: R)
1448 where
1449 R::Context: Spawner + Metrics + Clock,
1450 {
1451 let kill = 9;
1452 runner.start(|context| async move {
1453 let before = context
1455 .with_label("before")
1456 .spawn(move |context| async move {
1457 let mut signal = context.stopped();
1458 let value = (&mut signal).await.unwrap();
1459 assert_eq!(value, kill);
1460 drop(signal);
1461 });
1462
1463 let result = context.clone().stop(kill, None).await;
1465 assert!(result.is_ok());
1466
1467 let after = context
1469 .with_label("after")
1470 .spawn(move |context| async move {
1471 let value = context.stopped().await.unwrap();
1473 assert_eq!(value, kill);
1474 });
1475
1476 let result = join!(before, after);
1478 assert!(result.0.is_ok());
1479 assert!(result.1.is_ok());
1480 });
1481 }
1482
1483 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1484 where
1485 R::Context: Spawner + Metrics + Clock,
1486 {
1487 let kill = 42;
1488 runner.start(|context| async move {
1489 let (started_tx, mut started_rx) = mpsc::channel(3);
1490 let counter = Arc::new(AtomicU32::new(0));
1491
1492 let task = |cleanup_duration: Duration| {
1495 let context = context.clone();
1496 let counter = counter.clone();
1497 let started_tx = started_tx.clone();
1498 context.spawn(move |context| async move {
1499 let mut signal = context.stopped();
1501 started_tx.send(()).await.unwrap();
1502
1503 let value = (&mut signal).await.unwrap();
1505 assert_eq!(value, kill);
1506 context.sleep(cleanup_duration).await;
1507 counter.fetch_add(1, Ordering::SeqCst);
1508
1509 drop(signal);
1511 })
1512 };
1513
1514 let task1 = task(Duration::from_millis(10));
1515 let task2 = task(Duration::from_millis(20));
1516 let task3 = task(Duration::from_millis(30));
1517
1518 for _ in 0..3 {
1520 started_rx.recv().await.unwrap();
1521 }
1522
1523 context.stop(kill, None).await.unwrap();
1525 assert_eq!(counter.load(Ordering::SeqCst), 3);
1526
1527 let result = join!(task1, task2, task3);
1529 assert!(result.0.is_ok());
1530 assert!(result.1.is_ok());
1531 assert!(result.2.is_ok());
1532 });
1533 }
1534
1535 fn test_shutdown_timeout<R: Runner>(runner: R)
1536 where
1537 R::Context: Spawner + Metrics + Clock,
1538 {
1539 let kill = 42;
1540 runner.start(|context| async move {
1541 let (started_tx, started_rx) = oneshot::channel();
1543
1544 context.clone().spawn(move |context| async move {
1546 let signal = context.stopped();
1547 started_tx.send(()).unwrap();
1548 pending::<()>().await;
1549 signal.await.unwrap();
1550 });
1551
1552 started_rx.await.unwrap();
1554 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1555
1556 assert!(matches!(result, Err(Error::Timeout)));
1558 });
1559 }
1560
1561 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1562 where
1563 R::Context: Spawner + Metrics + Clock,
1564 {
1565 let kill1 = 42;
1566 let kill2 = 43;
1567
1568 runner.start(|context| async move {
1569 let (started_tx, started_rx) = oneshot::channel();
1570 let counter = Arc::new(AtomicU32::new(0));
1571
1572 let task = context.with_label("blocking_task").spawn({
1574 let counter = counter.clone();
1575 move |context| async move {
1576 let mut signal = context.stopped();
1578 started_tx.send(()).unwrap();
1579
1580 let value = (&mut signal).await.unwrap();
1582 assert_eq!(value, kill1);
1583 context.sleep(Duration::from_millis(50)).await;
1584
1585 counter.fetch_add(1, Ordering::SeqCst);
1587 drop(signal);
1588 }
1589 });
1590
1591 started_rx.await.unwrap();
1593
1594 let stop_task1 = context.clone().stop(kill1, None);
1597 pin_mut!(stop_task1);
1598 let stop_task2 = context.clone().stop(kill2, None);
1599 pin_mut!(stop_task2);
1600
1601 assert!(stop_task1.as_mut().now_or_never().is_none());
1603 assert!(stop_task2.as_mut().now_or_never().is_none());
1604
1605 assert!(stop_task1.await.is_ok());
1607 assert!(stop_task2.await.is_ok());
1608
1609 let sig = context.stopped().await;
1611 assert_eq!(sig.unwrap(), kill1);
1612
1613 let result = task.await;
1615 assert!(result.is_ok());
1616 assert_eq!(counter.load(Ordering::SeqCst), 1);
1617
1618 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1620 });
1621 }
1622
1623 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1624 where
1625 R::Context: Spawner + Metrics,
1626 {
1627 runner.start(|context| async move {
1628 context
1630 .with_label("before")
1631 .spawn(move |context| async move {
1632 let mut signal = context.stopped();
1633 let value = (&mut signal).await.unwrap();
1634
1635 assert_eq!(value, 42);
1637 drop(signal);
1638 });
1639
1640 reschedule().await;
1642 });
1643 }
1644
1645 fn test_spawn_dedicated<R: Runner>(runner: R)
1646 where
1647 R::Context: Spawner,
1648 {
1649 runner.start(|context| async move {
1650 let handle = context.dedicated().spawn(|_| async move { 42 });
1651 assert!(matches!(handle.await, Ok(42)));
1652 });
1653 }
1654
1655 fn test_spawn<R: Runner>(runner: R)
1656 where
1657 R::Context: Spawner + Clock,
1658 {
1659 runner.start(|context| async move {
1660 let child_handle = Arc::new(Mutex::new(None));
1661 let child_handle2 = child_handle.clone();
1662
1663 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1664 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1665 let parent_handle = context.spawn(move |context| async move {
1666 let handle = context.spawn(|_| async {});
1668
1669 *child_handle2.lock().unwrap() = Some(handle);
1671
1672 parent_initialized_tx.send(()).unwrap();
1673
1674 parent_complete_rx.await.unwrap();
1676 });
1677
1678 parent_initialized_rx.await.unwrap();
1680
1681 let child_handle = child_handle.lock().unwrap().take().unwrap();
1683 assert!(child_handle.await.is_ok());
1684
1685 parent_complete_tx.send(()).unwrap();
1687
1688 assert!(parent_handle.await.is_ok());
1690 });
1691 }
1692
1693 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1694 where
1695 R::Context: Spawner + Clock,
1696 {
1697 runner.start(|context| async move {
1698 let child_handle = Arc::new(Mutex::new(None));
1699 let child_handle2 = child_handle.clone();
1700
1701 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1702 let parent_handle = context.spawn(move |context| async move {
1703 let handle = context.spawn(|_| pending::<()>());
1705
1706 *child_handle2.lock().unwrap() = Some(handle);
1708
1709 parent_initialized_tx.send(()).unwrap();
1710
1711 pending::<()>().await
1713 });
1714
1715 parent_initialized_rx.await.unwrap();
1717
1718 parent_handle.abort();
1720 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1721
1722 let child_handle = child_handle.lock().unwrap().take().unwrap();
1724 assert!(matches!(child_handle.await, Err(Error::Closed)));
1725 });
1726 }
1727
1728 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1729 where
1730 R::Context: Spawner + Clock,
1731 {
1732 runner.start(|context| async move {
1733 let child_handle = Arc::new(Mutex::new(None));
1734 let child_handle2 = child_handle.clone();
1735
1736 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1737 let parent_handle = context.spawn(move |context| async move {
1738 let handle = context.spawn(|_| pending::<()>());
1740
1741 *child_handle2.lock().unwrap() = Some(handle);
1743
1744 parent_complete_rx.await.unwrap();
1746 });
1747
1748 parent_complete_tx.send(()).unwrap();
1750
1751 assert!(parent_handle.await.is_ok());
1753
1754 let child_handle = child_handle.lock().unwrap().take().unwrap();
1756 assert!(matches!(child_handle.await, Err(Error::Closed)));
1757 });
1758 }
1759
1760 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1761 where
1762 R::Context: Spawner + Clock,
1763 {
1764 runner.start(|context| async move {
1765 let c0 = context.clone();
1775 let g0 = c0.clone();
1776 let g1 = c0.clone();
1777 let c1 = context.clone();
1778 let g2 = c1.clone();
1779 let g3 = c1.clone();
1780 let c2 = context.clone();
1781 let g4 = c2.clone();
1782 let g5 = c2.clone();
1783
1784 let handles = Arc::new(Mutex::new(Vec::new()));
1786 let (initialized_tx, mut initialized_rx) = mpsc::channel(9);
1787 let root_task = context.spawn({
1788 let handles = handles.clone();
1789 move |_| async move {
1790 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1791 {
1792 let handle = context.spawn({
1793 let handles = handles.clone();
1794 let initialized_tx = initialized_tx.clone();
1795 move |_| async move {
1796 for grandchild in grandchildren {
1797 let handle = grandchild.spawn(|_| async {
1798 pending::<()>().await;
1799 });
1800 handles.lock().unwrap().push(handle);
1801 initialized_tx.send(()).await.unwrap();
1802 }
1803
1804 pending::<()>().await;
1805 }
1806 });
1807 handles.lock().unwrap().push(handle);
1808 initialized_tx.send(()).await.unwrap();
1809 }
1810
1811 pending::<()>().await;
1812 }
1813 });
1814
1815 for _ in 0..9 {
1817 initialized_rx.recv().await.unwrap();
1818 }
1819
1820 assert_eq!(handles.lock().unwrap().len(), 9);
1822
1823 root_task.abort();
1825 assert!(matches!(root_task.await, Err(Error::Closed)));
1826
1827 let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1829 for handle in handles {
1830 assert!(matches!(handle.await, Err(Error::Closed)));
1831 }
1832 });
1833 }
1834
1835 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1836 where
1837 R::Context: Spawner + Clock,
1838 {
1839 runner.start(|context| async move {
1840 let (child_started_tx, child_started_rx) = oneshot::channel();
1841 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1842 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1843 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1844 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1845 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1846 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1847
1848 let parent = context.spawn(move |context| async move {
1849 let child_handle = context.clone().spawn(|_| async move {
1851 child_started_tx.send(()).unwrap();
1852 child_complete_rx.await.unwrap();
1854 });
1855 assert!(
1856 child_handle_tx.send(child_handle).is_ok(),
1857 "child handle receiver dropped"
1858 );
1859
1860 let sibling_handle = context.clone().spawn(move |_| async move {
1862 sibling_started_tx.send(()).unwrap();
1863 sibling_complete_rx.await.unwrap();
1865 });
1866 assert!(
1867 sibling_handle_tx.send(sibling_handle).is_ok(),
1868 "sibling handle receiver dropped"
1869 );
1870
1871 parent_complete_rx.await.unwrap();
1873 });
1874
1875 child_started_rx.await.unwrap();
1877 sibling_started_rx.await.unwrap();
1878
1879 sibling_complete_tx.send(()).unwrap();
1881 assert!(sibling_handle_rx.await.is_ok());
1882
1883 child_complete_tx.send(()).unwrap();
1885 assert!(child_handle_rx.await.is_ok());
1886
1887 parent_complete_tx.send(()).unwrap();
1889 assert!(parent.await.is_ok());
1890 });
1891 }
1892
1893 fn test_spawn_clone_chain<R: Runner>(runner: R)
1894 where
1895 R::Context: Spawner + Clock,
1896 {
1897 runner.start(|context| async move {
1898 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1899 let (child_started_tx, child_started_rx) = oneshot::channel();
1900 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1901 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1902 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1903
1904 let parent = context.clone().spawn({
1905 move |context| async move {
1906 let child = context.clone().spawn({
1907 move |context| async move {
1908 let grandchild = context.clone().spawn({
1909 move |_| async move {
1910 grandchild_started_tx.send(()).unwrap();
1911 pending::<()>().await;
1912 }
1913 });
1914 assert!(
1915 grandchild_handle_tx.send(grandchild).is_ok(),
1916 "grandchild handle receiver dropped"
1917 );
1918 child_started_tx.send(()).unwrap();
1919 pending::<()>().await;
1920 }
1921 });
1922 assert!(
1923 child_handle_tx.send(child).is_ok(),
1924 "child handle receiver dropped"
1925 );
1926 parent_started_tx.send(()).unwrap();
1927 pending::<()>().await;
1928 }
1929 });
1930
1931 parent_started_rx.await.unwrap();
1932 child_started_rx.await.unwrap();
1933 grandchild_started_rx.await.unwrap();
1934
1935 let child_handle = child_handle_rx.await.unwrap();
1936 let grandchild_handle = grandchild_handle_rx.await.unwrap();
1937
1938 parent.abort();
1939 assert!(parent.await.is_err());
1940
1941 assert!(child_handle.await.is_err());
1942 assert!(grandchild_handle.await.is_err());
1943 });
1944 }
1945
1946 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1947 where
1948 R::Context: Spawner + Clock,
1949 {
1950 runner.start(|context| async move {
1951 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1952 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1953
1954 let parent = context.clone().spawn({
1955 move |context| async move {
1956 let clone1 = context.clone();
1957 let clone2 = clone1.clone();
1958 let clone3 = clone2.clone();
1959
1960 let leaf = clone3.clone().spawn({
1961 move |_| async move {
1962 leaf_started_tx.send(()).unwrap();
1963 pending::<()>().await;
1964 }
1965 });
1966
1967 leaf_handle_tx
1968 .send(leaf)
1969 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1970 pending::<()>().await;
1971 }
1972 });
1973
1974 leaf_started_rx.await.unwrap();
1975 let leaf_handle = leaf_handle_rx.await.unwrap();
1976
1977 parent.abort();
1978 assert!(parent.await.is_err());
1979 assert!(leaf_handle.await.is_err());
1980 });
1981 }
1982
1983 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1984 where
1985 R::Context: Spawner,
1986 {
1987 runner.start(|context| async move {
1988 let context = if dedicated {
1989 context.dedicated()
1990 } else {
1991 context.shared(true)
1992 };
1993
1994 let handle = context.spawn(|_| async move { 42 });
1995 let result = handle.await;
1996 assert!(matches!(result, Ok(42)));
1997 });
1998 }
1999
2000 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
2001 where
2002 R::Context: Spawner + Clock,
2003 {
2004 runner.start(|context| async move {
2005 let context = if dedicated {
2006 context.dedicated()
2007 } else {
2008 context.shared(true)
2009 };
2010
2011 context.clone().spawn(|_| async move {
2012 panic!("blocking task panicked");
2013 });
2014
2015 loop {
2017 context.sleep(Duration::from_millis(100)).await;
2018 }
2019 });
2020 }
2021
2022 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
2023 where
2024 R::Context: Spawner + Clock,
2025 {
2026 let result: Result<(), Error> = runner.start(|context| async move {
2027 let context = if dedicated {
2028 context.dedicated()
2029 } else {
2030 context.shared(true)
2031 };
2032
2033 let handle = context.clone().spawn(|_| async move {
2034 panic!("blocking task panicked");
2035 });
2036 handle.await
2037 });
2038 assert!(matches!(result, Err(Error::Exited)));
2039 }
2040
2041 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
2042 runner.start(|_| async move {
2043 let dropper = Arc::new(());
2045 let executor = deterministic::Runner::default();
2046 executor.start({
2047 let dropper = dropper.clone();
2048 move |context| async move {
2049 let (setup_tx, mut setup_rx) = mpsc::unbounded_channel::<()>();
2051 let (tx1, mut rx1) = mpsc::unbounded_channel::<()>();
2052 let (tx2, mut rx2) = mpsc::unbounded_channel::<()>();
2053
2054 context.with_label("task1").spawn({
2056 let setup_tx = setup_tx.clone();
2057 let dropper = dropper.clone();
2058 move |_| async move {
2059 tx2.send(()).unwrap();
2061 rx1.recv().await.unwrap();
2062 setup_tx.send(()).unwrap();
2063
2064 while rx1.recv().await.is_some() {}
2066 drop(tx2);
2067 drop(dropper);
2068 }
2069 });
2070
2071 context.with_label("task2").spawn(move |_| async move {
2073 tx1.send(()).unwrap();
2075 rx2.recv().await.unwrap();
2076 setup_tx.send(()).unwrap();
2077
2078 while rx2.recv().await.is_some() {}
2080 drop(tx1);
2081 drop(dropper);
2082 });
2083
2084 setup_rx.recv().await.unwrap();
2086 setup_rx.recv().await.unwrap();
2087 }
2088 });
2089
2090 Arc::try_unwrap(dropper).expect("references remaining");
2092 });
2093 }
2094
2095 fn test_late_waker<R: Runner>(runner: R)
2096 where
2097 R::Context: Metrics + Spawner,
2098 {
2099 struct CaptureWaker {
2102 tx: Option<oneshot::Sender<Waker>>,
2103 sent: bool,
2104 }
2105 impl Future for CaptureWaker {
2106 type Output = ();
2107 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
2108 if !self.sent {
2109 if let Some(tx) = self.tx.take() {
2110 let _ = tx.send(cx.waker().clone());
2112 }
2113 self.sent = true;
2114 }
2115 Poll::Pending
2116 }
2117 }
2118
2119 struct WakeOnDrop(Option<Waker>);
2121 impl Drop for WakeOnDrop {
2122 fn drop(&mut self) {
2123 if let Some(w) = self.0.take() {
2124 w.wake_by_ref();
2125 }
2126 }
2127 }
2128
2129 let holder = runner.start(|context| async move {
2131 let (tx, rx) = oneshot::channel::<Waker>();
2133
2134 context
2136 .with_label("capture_waker")
2137 .spawn(move |_| async move {
2138 CaptureWaker {
2139 tx: Some(tx),
2140 sent: false,
2141 }
2142 .await;
2143 });
2144
2145 utils::reschedule().await;
2147
2148 let waker = rx.await.expect("waker not received");
2150
2151 WakeOnDrop(Some(waker))
2153 });
2154
2155 drop(holder);
2158 }
2159
2160 fn test_metrics<R: Runner>(runner: R)
2161 where
2162 R::Context: Metrics,
2163 {
2164 runner.start(|context| async move {
2165 assert_eq!(context.label(), "");
2167
2168 let counter = Counter::<u64>::default();
2170 context.register("test", "test", counter.clone());
2171
2172 counter.inc();
2174
2175 let buffer = context.encode();
2177 assert!(buffer.contains("test_total 1"));
2178
2179 let context = context.with_label("nested");
2181 let nested_counter = Counter::<u64>::default();
2182 context.register("test", "test", nested_counter.clone());
2183
2184 nested_counter.inc();
2186
2187 let buffer = context.encode();
2189 assert!(buffer.contains("nested_test_total 1"));
2190 assert!(buffer.contains("test_total 1"));
2191 });
2192 }
2193
2194 fn test_metrics_with_attribute<R: Runner>(runner: R)
2195 where
2196 R::Context: Metrics,
2197 {
2198 runner.start(|context| async move {
2199 let ctx_epoch5 = context
2201 .with_label("consensus")
2202 .with_attribute("epoch", "e5");
2203
2204 let counter = Counter::<u64>::default();
2206 ctx_epoch5.register("votes", "vote count", counter.clone());
2207 counter.inc();
2208
2209 let buffer = context.encode();
2211 assert!(
2212 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2213 "Expected metric with epoch attribute, got: {}",
2214 buffer
2215 );
2216
2217 let ctx_epoch6 = context
2219 .with_label("consensus")
2220 .with_attribute("epoch", "e6");
2221 let counter2 = Counter::<u64>::default();
2222 ctx_epoch6.register("votes", "vote count", counter2.clone());
2223 counter2.inc();
2224 counter2.inc();
2225
2226 let buffer = context.encode();
2228 assert!(
2229 buffer.contains("consensus_votes_total{epoch=\"e5\"} 1"),
2230 "Expected metric with epoch=e5, got: {}",
2231 buffer
2232 );
2233 assert!(
2234 buffer.contains("consensus_votes_total{epoch=\"e6\"} 2"),
2235 "Expected metric with epoch=e6, got: {}",
2236 buffer
2237 );
2238
2239 assert_eq!(
2241 buffer.matches("# HELP consensus_votes").count(),
2242 1,
2243 "HELP should appear exactly once, got: {}",
2244 buffer
2245 );
2246 assert_eq!(
2247 buffer.matches("# TYPE consensus_votes").count(),
2248 1,
2249 "TYPE should appear exactly once, got: {}",
2250 buffer
2251 );
2252
2253 let ctx_multi = context
2255 .with_label("engine")
2256 .with_attribute("region", "us")
2257 .with_attribute("instance", "i1");
2258 let counter3 = Counter::<u64>::default();
2259 ctx_multi.register("requests", "request count", counter3.clone());
2260 counter3.inc();
2261
2262 let buffer = context.encode();
2263 assert!(
2264 buffer.contains("engine_requests_total{instance=\"i1\",region=\"us\"} 1"),
2265 "Expected metric with sorted attributes, got: {}",
2266 buffer
2267 );
2268 });
2269 }
2270
2271 #[test]
2272 fn test_deterministic_metrics_with_attribute() {
2273 let executor = deterministic::Runner::default();
2274 test_metrics_with_attribute(executor);
2275 }
2276
2277 #[test]
2278 fn test_tokio_metrics_with_attribute() {
2279 let runner = tokio::Runner::default();
2280 test_metrics_with_attribute(runner);
2281 }
2282
2283 fn test_metrics_attribute_with_nested_label<R: Runner>(runner: R)
2284 where
2285 R::Context: Metrics,
2286 {
2287 runner.start(|context| async move {
2288 let ctx = context
2290 .with_label("orchestrator")
2291 .with_attribute("epoch", "e5")
2292 .with_label("engine");
2293
2294 let counter = Counter::<u64>::default();
2296 ctx.register("votes", "vote count", counter.clone());
2297 counter.inc();
2298
2299 let buffer = context.encode();
2301 assert!(
2302 buffer.contains("orchestrator_engine_votes_total{epoch=\"e5\"} 1"),
2303 "Expected metric with preserved epoch attribute, got: {}",
2304 buffer
2305 );
2306
2307 let ctx2 = context
2309 .with_label("outer")
2310 .with_attribute("region", "us")
2311 .with_label("middle")
2312 .with_attribute("az", "east")
2313 .with_label("inner");
2314
2315 let counter2 = Counter::<u64>::default();
2316 ctx2.register("requests", "request count", counter2.clone());
2317 counter2.inc();
2318 counter2.inc();
2319
2320 let buffer = context.encode();
2321 assert!(
2322 buffer.contains("outer_middle_inner_requests_total{az=\"east\",region=\"us\"} 2"),
2323 "Expected metric with all attributes preserved and sorted, got: {}",
2324 buffer
2325 );
2326 });
2327 }
2328
2329 #[test]
2330 fn test_deterministic_metrics_attribute_with_nested_label() {
2331 let executor = deterministic::Runner::default();
2332 test_metrics_attribute_with_nested_label(executor);
2333 }
2334
2335 #[test]
2336 fn test_tokio_metrics_attribute_with_nested_label() {
2337 let runner = tokio::Runner::default();
2338 test_metrics_attribute_with_nested_label(runner);
2339 }
2340
2341 fn test_metrics_attributes_isolated_between_contexts<R: Runner>(runner: R)
2342 where
2343 R::Context: Metrics,
2344 {
2345 runner.start(|context| async move {
2346 let ctx_a = context.with_label("component_a").with_attribute("epoch", 1);
2348 let ctx_b = context.with_label("component_b").with_attribute("epoch", 2);
2349
2350 let c1 = Counter::<u64>::default();
2352 ctx_a.register("requests", "help", c1);
2353
2354 let c2 = Counter::<u64>::default();
2356 ctx_b.register("requests", "help", c2);
2357
2358 let c3 = Counter::<u64>::default();
2360 ctx_a.register("errors", "help", c3);
2361
2362 let output = context.encode();
2363
2364 assert!(
2366 output.contains("component_a_requests_total{epoch=\"1\"} 0"),
2367 "ctx_a requests should have epoch=1: {output}"
2368 );
2369 assert!(
2370 output.contains("component_a_errors_total{epoch=\"1\"} 0"),
2371 "ctx_a errors should have epoch=1: {output}"
2372 );
2373 assert!(
2374 !output.contains("component_a_requests_total{epoch=\"2\"}"),
2375 "ctx_a requests should not have epoch=2: {output}"
2376 );
2377
2378 assert!(
2380 output.contains("component_b_requests_total{epoch=\"2\"} 0"),
2381 "ctx_b should have epoch=2: {output}"
2382 );
2383 assert!(
2384 !output.contains("component_b_requests_total{epoch=\"1\"}"),
2385 "ctx_b should not have epoch=1: {output}"
2386 );
2387 });
2388 }
2389
2390 #[test]
2391 fn test_deterministic_metrics_attributes_isolated_between_contexts() {
2392 let executor = deterministic::Runner::default();
2393 test_metrics_attributes_isolated_between_contexts(executor);
2394 }
2395
2396 #[test]
2397 fn test_tokio_metrics_attributes_isolated_between_contexts() {
2398 let runner = tokio::Runner::default();
2399 test_metrics_attributes_isolated_between_contexts(runner);
2400 }
2401
2402 fn test_metrics_attributes_sorted_deterministically<R: Runner>(runner: R)
2403 where
2404 R::Context: Metrics,
2405 {
2406 runner.start(|context| async move {
2407 let ctx_ab = context
2409 .with_label("service")
2410 .with_attribute("region", "us")
2411 .with_attribute("env", "prod");
2412
2413 let ctx_ba = context
2414 .with_label("service")
2415 .with_attribute("env", "prod")
2416 .with_attribute("region", "us");
2417
2418 let c1 = Counter::<u64>::default();
2420 ctx_ab.register("requests", "help", c1.clone());
2421 c1.inc();
2422
2423 let c2 = Counter::<u64>::default();
2425 ctx_ba.register("errors", "help", c2.clone());
2426 c2.inc();
2427 c2.inc();
2428
2429 let output = context.encode();
2430
2431 assert!(
2433 output.contains("service_requests_total{env=\"prod\",region=\"us\"} 1"),
2434 "requests should have sorted labels: {output}"
2435 );
2436 assert!(
2437 output.contains("service_errors_total{env=\"prod\",region=\"us\"} 2"),
2438 "errors should have sorted labels: {output}"
2439 );
2440
2441 assert!(
2443 !output.contains("region=\"us\",env=\"prod\""),
2444 "should not have unsorted label order: {output}"
2445 );
2446 });
2447 }
2448
2449 #[test]
2450 fn test_deterministic_metrics_attributes_sorted_deterministically() {
2451 let executor = deterministic::Runner::default();
2452 test_metrics_attributes_sorted_deterministically(executor);
2453 }
2454
2455 #[test]
2456 fn test_tokio_metrics_attributes_sorted_deterministically() {
2457 let runner = tokio::Runner::default();
2458 test_metrics_attributes_sorted_deterministically(runner);
2459 }
2460
2461 fn test_metrics_nested_labels_with_attributes<R: Runner>(runner: R)
2462 where
2463 R::Context: Metrics,
2464 {
2465 runner.start(|context| async move {
2466 let svc_a = context.with_label("service_a");
2468
2469 let svc_a_v2 = context.with_label("service_a").with_attribute("version", 2);
2471
2472 let svc_b_worker = context.with_label("service_b").with_label("worker");
2474
2475 let svc_b_worker_shard = context
2477 .with_label("service_b")
2478 .with_label("worker")
2479 .with_attribute("shard", 99);
2480
2481 let svc_b_manager = context.with_label("service_b").with_label("manager");
2483
2484 let svc_c = context.with_label("service_c");
2486
2487 let c1 = Counter::<u64>::default();
2489 svc_a.register("requests", "help", c1);
2490
2491 let c2 = Counter::<u64>::default();
2492 svc_a_v2.register("requests", "help", c2);
2493
2494 let c3 = Counter::<u64>::default();
2495 svc_b_worker.register("tasks", "help", c3);
2496
2497 let c4 = Counter::<u64>::default();
2498 svc_b_worker_shard.register("tasks", "help", c4);
2499
2500 let c5 = Counter::<u64>::default();
2501 svc_b_manager.register("decisions", "help", c5);
2502
2503 let c6 = Counter::<u64>::default();
2504 svc_c.register("requests", "help", c6);
2505
2506 let output = context.encode();
2507
2508 assert!(
2510 output.contains("service_a_requests_total 0"),
2511 "svc_a plain should exist: {output}"
2512 );
2513 assert!(
2514 output.contains("service_a_requests_total{version=\"2\"} 0"),
2515 "svc_a_v2 should have version=2: {output}"
2516 );
2517
2518 assert!(
2520 output.contains("service_b_worker_tasks_total 0"),
2521 "svc_b_worker plain should exist: {output}"
2522 );
2523 assert!(
2524 output.contains("service_b_worker_tasks_total{shard=\"99\"} 0"),
2525 "svc_b_worker_shard should have shard=99: {output}"
2526 );
2527
2528 assert!(
2530 output.contains("service_b_manager_decisions_total 0"),
2531 "svc_b_manager should have no attributes: {output}"
2532 );
2533 assert!(
2534 !output.contains("service_b_manager_decisions_total{"),
2535 "svc_b_manager should have no attributes at all: {output}"
2536 );
2537
2538 assert!(
2540 output.contains("service_c_requests_total 0"),
2541 "svc_c should have no attributes: {output}"
2542 );
2543 assert!(
2544 !output.contains("service_c_requests_total{"),
2545 "svc_c should have no attributes at all: {output}"
2546 );
2547
2548 assert!(
2550 !output.contains("service_b_manager_decisions_total{shard="),
2551 "svc_b_manager should not have shard: {output}"
2552 );
2553 assert!(
2554 !output.contains("service_a_requests_total{shard="),
2555 "svc_a should not have shard: {output}"
2556 );
2557 assert!(
2558 !output.contains("service_c_requests_total{version="),
2559 "svc_c should not have version: {output}"
2560 );
2561 });
2562 }
2563
2564 #[test]
2565 fn test_deterministic_metrics_nested_labels_with_attributes() {
2566 let executor = deterministic::Runner::default();
2567 test_metrics_nested_labels_with_attributes(executor);
2568 }
2569
2570 #[test]
2571 fn test_tokio_metrics_nested_labels_with_attributes() {
2572 let runner = tokio::Runner::default();
2573 test_metrics_nested_labels_with_attributes(runner);
2574 }
2575
2576 fn test_metrics_family_with_attributes<R: Runner>(runner: R)
2577 where
2578 R::Context: Metrics,
2579 {
2580 runner.start(|context| async move {
2581 #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
2582 struct RequestLabels {
2583 method: String,
2584 status: u16,
2585 }
2586
2587 let ctx = context
2589 .with_label("api")
2590 .with_attribute("region", "us_east")
2591 .with_attribute("env", "prod");
2592
2593 let requests: Family<RequestLabels, Counter<u64>> = Family::default();
2595 ctx.register("requests", "HTTP requests", requests.clone());
2596
2597 requests
2599 .get_or_create(&RequestLabels {
2600 method: "GET".to_string(),
2601 status: 200,
2602 })
2603 .inc();
2604 requests
2605 .get_or_create(&RequestLabels {
2606 method: "POST".to_string(),
2607 status: 201,
2608 })
2609 .inc();
2610 requests
2611 .get_or_create(&RequestLabels {
2612 method: "GET".to_string(),
2613 status: 404,
2614 })
2615 .inc();
2616
2617 let output = context.encode();
2618
2619 assert!(
2623 output.contains(
2624 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"200\"} 1"
2625 ),
2626 "GET 200 should have merged labels: {output}"
2627 );
2628 assert!(
2629 output.contains(
2630 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"POST\",status=\"201\"} 1"
2631 ),
2632 "POST 201 should have merged labels: {output}"
2633 );
2634 assert!(
2635 output.contains(
2636 "api_requests_total{env=\"prod\",region=\"us_east\",method=\"GET\",status=\"404\"} 1"
2637 ),
2638 "GET 404 should have merged labels: {output}"
2639 );
2640
2641 let ctx_plain = context.with_label("api_plain");
2643 let plain_requests: Family<RequestLabels, Counter<u64>> = Family::default();
2644 ctx_plain.register("requests", "HTTP requests", plain_requests.clone());
2645
2646 plain_requests
2647 .get_or_create(&RequestLabels {
2648 method: "DELETE".to_string(),
2649 status: 204,
2650 })
2651 .inc();
2652
2653 let output = context.encode();
2654
2655 assert!(
2657 output.contains("api_plain_requests_total{method=\"DELETE\",status=\"204\"} 1"),
2658 "plain DELETE should have only family labels: {output}"
2659 );
2660 assert!(
2661 !output.contains("api_plain_requests_total{env="),
2662 "plain should not have env attribute: {output}"
2663 );
2664 assert!(
2665 !output.contains("api_plain_requests_total{region="),
2666 "plain should not have region attribute: {output}"
2667 );
2668 });
2669 }
2670
2671 #[test]
2672 fn test_deterministic_metrics_family_with_attributes() {
2673 let executor = deterministic::Runner::default();
2674 test_metrics_family_with_attributes(executor);
2675 }
2676
2677 #[test]
2678 fn test_tokio_metrics_family_with_attributes() {
2679 let runner = tokio::Runner::default();
2680 test_metrics_family_with_attributes(runner);
2681 }
2682
2683 #[test]
2684 fn test_deterministic_future() {
2685 let runner = deterministic::Runner::default();
2686 test_error_future(runner);
2687 }
2688
2689 #[test]
2690 fn test_deterministic_clock_sleep() {
2691 let executor = deterministic::Runner::default();
2692 test_clock_sleep(executor);
2693 }
2694
2695 #[test]
2696 fn test_deterministic_clock_sleep_until() {
2697 let executor = deterministic::Runner::default();
2698 test_clock_sleep_until(executor);
2699 }
2700
2701 #[test]
2702 fn test_deterministic_clock_timeout() {
2703 let executor = deterministic::Runner::default();
2704 test_clock_timeout(executor);
2705 }
2706
2707 #[test]
2708 fn test_deterministic_root_finishes() {
2709 let executor = deterministic::Runner::default();
2710 test_root_finishes(executor);
2711 }
2712
2713 #[test]
2714 fn test_deterministic_spawn_after_abort() {
2715 let executor = deterministic::Runner::default();
2716 test_spawn_after_abort(executor);
2717 }
2718
2719 #[test]
2720 fn test_deterministic_spawn_abort() {
2721 let executor = deterministic::Runner::default();
2722 test_spawn_abort(executor, false, false);
2723 }
2724
2725 #[test]
2726 #[should_panic(expected = "blah")]
2727 fn test_deterministic_panic_aborts_root() {
2728 let runner = deterministic::Runner::default();
2729 test_panic_aborts_root(runner);
2730 }
2731
2732 #[test]
2733 #[should_panic(expected = "blah")]
2734 fn test_deterministic_panic_aborts_root_caught() {
2735 let cfg = deterministic::Config::default().with_catch_panics(true);
2736 let runner = deterministic::Runner::new(cfg);
2737 test_panic_aborts_root(runner);
2738 }
2739
2740 #[test]
2741 #[should_panic(expected = "blah")]
2742 fn test_deterministic_panic_aborts_spawn() {
2743 let executor = deterministic::Runner::default();
2744 test_panic_aborts_spawn(executor);
2745 }
2746
2747 #[test]
2748 fn test_deterministic_panic_aborts_spawn_caught() {
2749 let cfg = deterministic::Config::default().with_catch_panics(true);
2750 let executor = deterministic::Runner::new(cfg);
2751 test_panic_aborts_spawn_caught(executor);
2752 }
2753
2754 #[test]
2755 #[should_panic(expected = "boom")]
2756 fn test_deterministic_multiple_panics() {
2757 let executor = deterministic::Runner::default();
2758 test_multiple_panics(executor);
2759 }
2760
2761 #[test]
2762 fn test_deterministic_multiple_panics_caught() {
2763 let cfg = deterministic::Config::default().with_catch_panics(true);
2764 let executor = deterministic::Runner::new(cfg);
2765 test_multiple_panics_caught(executor);
2766 }
2767
2768 #[test]
2769 fn test_deterministic_select() {
2770 let executor = deterministic::Runner::default();
2771 test_select(executor);
2772 }
2773
2774 #[test]
2775 fn test_deterministic_select_loop() {
2776 let executor = deterministic::Runner::default();
2777 test_select_loop(executor);
2778 }
2779
2780 #[test]
2781 fn test_deterministic_storage_operations() {
2782 let executor = deterministic::Runner::default();
2783 test_storage_operations(executor);
2784 }
2785
2786 #[test]
2787 fn test_deterministic_blob_read_write() {
2788 let executor = deterministic::Runner::default();
2789 test_blob_read_write(executor);
2790 }
2791
2792 #[test]
2793 fn test_deterministic_blob_resize() {
2794 let executor = deterministic::Runner::default();
2795 test_blob_resize(executor);
2796 }
2797
2798 #[test]
2799 fn test_deterministic_many_partition_read_write() {
2800 let executor = deterministic::Runner::default();
2801 test_many_partition_read_write(executor);
2802 }
2803
2804 #[test]
2805 fn test_deterministic_blob_read_past_length() {
2806 let executor = deterministic::Runner::default();
2807 test_blob_read_past_length(executor);
2808 }
2809
2810 #[test]
2811 fn test_deterministic_blob_clone_and_concurrent_read() {
2812 let executor = deterministic::Runner::default();
2814 test_blob_clone_and_concurrent_read(executor);
2815 }
2816
2817 #[test]
2818 fn test_deterministic_shutdown() {
2819 let executor = deterministic::Runner::default();
2820 test_shutdown(executor);
2821 }
2822
2823 #[test]
2824 fn test_deterministic_shutdown_multiple_signals() {
2825 let executor = deterministic::Runner::default();
2826 test_shutdown_multiple_signals(executor);
2827 }
2828
2829 #[test]
2830 fn test_deterministic_shutdown_timeout() {
2831 let executor = deterministic::Runner::default();
2832 test_shutdown_timeout(executor);
2833 }
2834
2835 #[test]
2836 fn test_deterministic_shutdown_multiple_stop_calls() {
2837 let executor = deterministic::Runner::default();
2838 test_shutdown_multiple_stop_calls(executor);
2839 }
2840
2841 #[test]
2842 fn test_deterministic_unfulfilled_shutdown() {
2843 let executor = deterministic::Runner::default();
2844 test_unfulfilled_shutdown(executor);
2845 }
2846
2847 #[test]
2848 fn test_deterministic_spawn_dedicated() {
2849 let executor = deterministic::Runner::default();
2850 test_spawn_dedicated(executor);
2851 }
2852
2853 #[test]
2854 fn test_deterministic_spawn() {
2855 let runner = deterministic::Runner::default();
2856 test_spawn(runner);
2857 }
2858
2859 #[test]
2860 fn test_deterministic_spawn_abort_on_parent_abort() {
2861 let runner = deterministic::Runner::default();
2862 test_spawn_abort_on_parent_abort(runner);
2863 }
2864
2865 #[test]
2866 fn test_deterministic_spawn_abort_on_parent_completion() {
2867 let runner = deterministic::Runner::default();
2868 test_spawn_abort_on_parent_completion(runner);
2869 }
2870
2871 #[test]
2872 fn test_deterministic_spawn_cascading_abort() {
2873 let runner = deterministic::Runner::default();
2874 test_spawn_cascading_abort(runner);
2875 }
2876
2877 #[test]
2878 fn test_deterministic_child_survives_sibling_completion() {
2879 let runner = deterministic::Runner::default();
2880 test_child_survives_sibling_completion(runner);
2881 }
2882
2883 #[test]
2884 fn test_deterministic_spawn_clone_chain() {
2885 let runner = deterministic::Runner::default();
2886 test_spawn_clone_chain(runner);
2887 }
2888
2889 #[test]
2890 fn test_deterministic_spawn_sparse_clone_chain() {
2891 let runner = deterministic::Runner::default();
2892 test_spawn_sparse_clone_chain(runner);
2893 }
2894
2895 #[test]
2896 fn test_deterministic_spawn_blocking() {
2897 for dedicated in [false, true] {
2898 let executor = deterministic::Runner::default();
2899 test_spawn_blocking(executor, dedicated);
2900 }
2901 }
2902
2903 #[test]
2904 #[should_panic(expected = "blocking task panicked")]
2905 fn test_deterministic_spawn_blocking_panic() {
2906 for dedicated in [false, true] {
2907 let executor = deterministic::Runner::default();
2908 test_spawn_blocking_panic(executor, dedicated);
2909 }
2910 }
2911
2912 #[test]
2913 fn test_deterministic_spawn_blocking_panic_caught() {
2914 for dedicated in [false, true] {
2915 let cfg = deterministic::Config::default().with_catch_panics(true);
2916 let executor = deterministic::Runner::new(cfg);
2917 test_spawn_blocking_panic_caught(executor, dedicated);
2918 }
2919 }
2920
2921 #[test]
2922 fn test_deterministic_spawn_blocking_abort() {
2923 for (dedicated, blocking) in [(false, true), (true, false)] {
2924 let executor = deterministic::Runner::default();
2925 test_spawn_abort(executor, dedicated, blocking);
2926 }
2927 }
2928
2929 #[test]
2930 fn test_deterministic_circular_reference_prevents_cleanup() {
2931 let executor = deterministic::Runner::default();
2932 test_circular_reference_prevents_cleanup(executor);
2933 }
2934
2935 #[test]
2936 fn test_deterministic_late_waker() {
2937 let executor = deterministic::Runner::default();
2938 test_late_waker(executor);
2939 }
2940
2941 #[test]
2942 fn test_deterministic_metrics() {
2943 let executor = deterministic::Runner::default();
2944 test_metrics(executor);
2945 }
2946
2947 #[test_collect_traces]
2948 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2949 let executor = deterministic::Runner::new(deterministic::Config::default());
2950 executor.start(|context| async move {
2951 context
2952 .with_label("test")
2953 .instrumented()
2954 .spawn(|context| async move {
2955 tracing::info!(field = "test field", "test log");
2956
2957 context
2958 .with_label("inner")
2959 .instrumented()
2960 .spawn(|_| async move {
2961 tracing::info!("inner log");
2962 })
2963 .await
2964 .unwrap();
2965 })
2966 .await
2967 .unwrap();
2968 });
2969
2970 let info_traces = traces.get_by_level(Level::INFO);
2971 assert_eq!(info_traces.len(), 2);
2972
2973 info_traces
2975 .expect_event_at_index(0, |event| {
2976 event.metadata.expect_content_exact("test log")?;
2977 event.metadata.expect_field_count(1)?;
2978 event.metadata.expect_field_exact("field", "test field")?;
2979 event.expect_span_count(1)?;
2980 event.expect_span_at_index(0, |span| {
2981 span.expect_content_exact("task")?;
2982 span.expect_field_count(1)?;
2983 span.expect_field_exact("name", "test")
2984 })
2985 })
2986 .unwrap();
2987
2988 info_traces
2989 .expect_event_at_index(1, |event| {
2990 event.metadata.expect_content_exact("inner log")?;
2991 event.metadata.expect_field_count(0)?;
2992 event.expect_span_count(1)?;
2993 event.expect_span_at_index(0, |span| {
2994 span.expect_content_exact("task")?;
2995 span.expect_field_count(1)?;
2996 span.expect_field_exact("name", "test_inner")
2997 })
2998 })
2999 .unwrap();
3000 }
3001
3002 #[test]
3003 fn test_deterministic_resolver() {
3004 let executor = deterministic::Runner::default();
3005 executor.start(|context| async move {
3006 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3008 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
3009 context.resolver_register("example.com", Some(vec![ip1, ip2]));
3010
3011 let addrs = context.resolve("example.com").await.unwrap();
3013 assert_eq!(addrs, vec![ip1, ip2]);
3014
3015 let result = context.resolve("unknown.com").await;
3017 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3018
3019 context.resolver_register("example.com", None);
3021 let result = context.resolve("example.com").await;
3022 assert!(matches!(result, Err(Error::ResolveFailed(_))));
3023 });
3024 }
3025
3026 #[test]
3027 fn test_tokio_error_future() {
3028 let runner = tokio::Runner::default();
3029 test_error_future(runner);
3030 }
3031
3032 #[test]
3033 fn test_tokio_clock_sleep() {
3034 let executor = tokio::Runner::default();
3035 test_clock_sleep(executor);
3036 }
3037
3038 #[test]
3039 fn test_tokio_clock_sleep_until() {
3040 let executor = tokio::Runner::default();
3041 test_clock_sleep_until(executor);
3042 }
3043
3044 #[test]
3045 fn test_tokio_clock_timeout() {
3046 let executor = tokio::Runner::default();
3047 test_clock_timeout(executor);
3048 }
3049
3050 #[test]
3051 fn test_tokio_root_finishes() {
3052 let executor = tokio::Runner::default();
3053 test_root_finishes(executor);
3054 }
3055
3056 #[test]
3057 fn test_tokio_spawn_after_abort() {
3058 let executor = tokio::Runner::default();
3059 test_spawn_after_abort(executor);
3060 }
3061
3062 #[test]
3063 fn test_tokio_spawn_abort() {
3064 let executor = tokio::Runner::default();
3065 test_spawn_abort(executor, false, false);
3066 }
3067
3068 #[test]
3069 #[should_panic(expected = "blah")]
3070 fn test_tokio_panic_aborts_root() {
3071 let executor = tokio::Runner::default();
3072 test_panic_aborts_root(executor);
3073 }
3074
3075 #[test]
3076 #[should_panic(expected = "blah")]
3077 fn test_tokio_panic_aborts_root_caught() {
3078 let cfg = tokio::Config::default().with_catch_panics(true);
3079 let executor = tokio::Runner::new(cfg);
3080 test_panic_aborts_root(executor);
3081 }
3082
3083 #[test]
3084 #[should_panic(expected = "blah")]
3085 fn test_tokio_panic_aborts_spawn() {
3086 let executor = tokio::Runner::default();
3087 test_panic_aborts_spawn(executor);
3088 }
3089
3090 #[test]
3091 fn test_tokio_panic_aborts_spawn_caught() {
3092 let cfg = tokio::Config::default().with_catch_panics(true);
3093 let executor = tokio::Runner::new(cfg);
3094 test_panic_aborts_spawn_caught(executor);
3095 }
3096
3097 #[test]
3098 #[should_panic(expected = "boom")]
3099 fn test_tokio_multiple_panics() {
3100 let executor = tokio::Runner::default();
3101 test_multiple_panics(executor);
3102 }
3103
3104 #[test]
3105 fn test_tokio_multiple_panics_caught() {
3106 let cfg = tokio::Config::default().with_catch_panics(true);
3107 let executor = tokio::Runner::new(cfg);
3108 test_multiple_panics_caught(executor);
3109 }
3110
3111 #[test]
3112 fn test_tokio_select() {
3113 let executor = tokio::Runner::default();
3114 test_select(executor);
3115 }
3116
3117 #[test]
3118 fn test_tokio_select_loop() {
3119 let executor = tokio::Runner::default();
3120 test_select_loop(executor);
3121 }
3122
3123 #[test]
3124 fn test_tokio_storage_operations() {
3125 let executor = tokio::Runner::default();
3126 test_storage_operations(executor);
3127 }
3128
3129 #[test]
3130 fn test_tokio_blob_read_write() {
3131 let executor = tokio::Runner::default();
3132 test_blob_read_write(executor);
3133 }
3134
3135 #[test]
3136 fn test_tokio_blob_resize() {
3137 let executor = tokio::Runner::default();
3138 test_blob_resize(executor);
3139 }
3140
3141 #[test]
3142 fn test_tokio_many_partition_read_write() {
3143 let executor = tokio::Runner::default();
3144 test_many_partition_read_write(executor);
3145 }
3146
3147 #[test]
3148 fn test_tokio_blob_read_past_length() {
3149 let executor = tokio::Runner::default();
3150 test_blob_read_past_length(executor);
3151 }
3152
3153 #[test]
3154 fn test_tokio_blob_clone_and_concurrent_read() {
3155 let executor = tokio::Runner::default();
3157 test_blob_clone_and_concurrent_read(executor);
3158 }
3159
3160 #[test]
3161 fn test_tokio_shutdown() {
3162 let executor = tokio::Runner::default();
3163 test_shutdown(executor);
3164 }
3165
3166 #[test]
3167 fn test_tokio_shutdown_multiple_signals() {
3168 let executor = tokio::Runner::default();
3169 test_shutdown_multiple_signals(executor);
3170 }
3171
3172 #[test]
3173 fn test_tokio_shutdown_timeout() {
3174 let executor = tokio::Runner::default();
3175 test_shutdown_timeout(executor);
3176 }
3177
3178 #[test]
3179 fn test_tokio_shutdown_multiple_stop_calls() {
3180 let executor = tokio::Runner::default();
3181 test_shutdown_multiple_stop_calls(executor);
3182 }
3183
3184 #[test]
3185 fn test_tokio_unfulfilled_shutdown() {
3186 let executor = tokio::Runner::default();
3187 test_unfulfilled_shutdown(executor);
3188 }
3189
3190 #[test]
3191 fn test_tokio_spawn_dedicated() {
3192 let executor = tokio::Runner::default();
3193 test_spawn_dedicated(executor);
3194 }
3195
3196 #[test]
3197 fn test_tokio_spawn() {
3198 let runner = tokio::Runner::default();
3199 test_spawn(runner);
3200 }
3201
3202 #[test]
3203 fn test_tokio_spawn_abort_on_parent_abort() {
3204 let runner = tokio::Runner::default();
3205 test_spawn_abort_on_parent_abort(runner);
3206 }
3207
3208 #[test]
3209 fn test_tokio_spawn_abort_on_parent_completion() {
3210 let runner = tokio::Runner::default();
3211 test_spawn_abort_on_parent_completion(runner);
3212 }
3213
3214 #[test]
3215 fn test_tokio_spawn_cascading_abort() {
3216 let runner = tokio::Runner::default();
3217 test_spawn_cascading_abort(runner);
3218 }
3219
3220 #[test]
3221 fn test_tokio_child_survives_sibling_completion() {
3222 let runner = tokio::Runner::default();
3223 test_child_survives_sibling_completion(runner);
3224 }
3225
3226 #[test]
3227 fn test_tokio_spawn_clone_chain() {
3228 let runner = tokio::Runner::default();
3229 test_spawn_clone_chain(runner);
3230 }
3231
3232 #[test]
3233 fn test_tokio_spawn_sparse_clone_chain() {
3234 let runner = tokio::Runner::default();
3235 test_spawn_sparse_clone_chain(runner);
3236 }
3237
3238 #[test]
3239 fn test_tokio_spawn_blocking() {
3240 for dedicated in [false, true] {
3241 let executor = tokio::Runner::default();
3242 test_spawn_blocking(executor, dedicated);
3243 }
3244 }
3245
3246 #[test]
3247 #[should_panic(expected = "blocking task panicked")]
3248 fn test_tokio_spawn_blocking_panic() {
3249 for dedicated in [false, true] {
3250 let executor = tokio::Runner::default();
3251 test_spawn_blocking_panic(executor, dedicated);
3252 }
3253 }
3254
3255 #[test]
3256 fn test_tokio_spawn_blocking_panic_caught() {
3257 for dedicated in [false, true] {
3258 let cfg = tokio::Config::default().with_catch_panics(true);
3259 let executor = tokio::Runner::new(cfg);
3260 test_spawn_blocking_panic_caught(executor, dedicated);
3261 }
3262 }
3263
3264 #[test]
3265 fn test_tokio_spawn_blocking_abort() {
3266 for (dedicated, blocking) in [(false, true), (true, false)] {
3267 let executor = tokio::Runner::default();
3268 test_spawn_abort(executor, dedicated, blocking);
3269 }
3270 }
3271
3272 #[test]
3273 fn test_tokio_circular_reference_prevents_cleanup() {
3274 let executor = tokio::Runner::default();
3275 test_circular_reference_prevents_cleanup(executor);
3276 }
3277
3278 #[test]
3279 fn test_tokio_late_waker() {
3280 let executor = tokio::Runner::default();
3281 test_late_waker(executor);
3282 }
3283
3284 #[test]
3285 fn test_tokio_metrics() {
3286 let executor = tokio::Runner::default();
3287 test_metrics(executor);
3288 }
3289
3290 #[test]
3291 fn test_tokio_process_rss_metric() {
3292 let executor = tokio::Runner::default();
3293 executor.start(|context| async move {
3294 loop {
3295 let metrics = context.encode();
3297 if !metrics.contains("runtime_process_rss") {
3298 context.sleep(Duration::from_millis(100)).await;
3299 continue;
3300 }
3301
3302 for line in metrics.lines() {
3304 if line.starts_with("runtime_process_rss")
3305 && !line.starts_with("runtime_process_rss{")
3306 {
3307 let parts: Vec<&str> = line.split_whitespace().collect();
3308 if parts.len() >= 2 {
3309 let rss_value: i64 =
3310 parts[1].parse().expect("Failed to parse RSS value");
3311 if rss_value > 0 {
3312 return;
3313 }
3314 }
3315 }
3316 }
3317 }
3318 });
3319 }
3320
3321 #[test]
3322 fn test_tokio_telemetry() {
3323 let executor = tokio::Runner::default();
3324 executor.start(|context| async move {
3325 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
3327
3328 tokio::telemetry::init(
3330 context.with_label("metrics"),
3331 tokio::telemetry::Logging {
3332 level: Level::INFO,
3333 json: false,
3334 },
3335 Some(address),
3336 None,
3337 );
3338
3339 let counter: Counter<u64> = Counter::default();
3341 context.register("test_counter", "Test counter", counter.clone());
3342 counter.inc();
3343
3344 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
3346 let mut line = Vec::new();
3347 loop {
3348 let received = stream.recv(1).await?;
3349 let byte = received.coalesce().as_ref()[0];
3350 if byte == b'\n' {
3351 if line.last() == Some(&b'\r') {
3352 line.pop(); }
3354 break;
3355 }
3356 line.push(byte);
3357 }
3358 String::from_utf8(line).map_err(|_| Error::ReadFailed)
3359 }
3360
3361 async fn read_headers<St: Stream>(
3362 stream: &mut St,
3363 ) -> Result<HashMap<String, String>, Error> {
3364 let mut headers = HashMap::new();
3365 loop {
3366 let line = read_line(stream).await?;
3367 if line.is_empty() {
3368 break;
3369 }
3370 let parts: Vec<&str> = line.splitn(2, ": ").collect();
3371 if parts.len() == 2 {
3372 headers.insert(parts[0].to_string(), parts[1].to_string());
3373 }
3374 }
3375 Ok(headers)
3376 }
3377
3378 async fn read_body<St: Stream>(
3379 stream: &mut St,
3380 content_length: usize,
3381 ) -> Result<String, Error> {
3382 let received = stream.recv(content_length as u64).await?;
3383 String::from_utf8(received.coalesce().into()).map_err(|_| Error::ReadFailed)
3384 }
3385
3386 let client_handle = context
3388 .with_label("client")
3389 .spawn(move |context| async move {
3390 let (mut sink, mut stream) = loop {
3391 match context.dial(address).await {
3392 Ok((sink, stream)) => break (sink, stream),
3393 Err(e) => {
3394 error!(err =?e, "failed to connect");
3396 context.sleep(Duration::from_millis(10)).await;
3397 }
3398 }
3399 };
3400
3401 let request = format!(
3403 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
3404 );
3405 sink.send(Bytes::from(request)).await.unwrap();
3406
3407 let status_line = read_line(&mut stream).await.unwrap();
3409 assert_eq!(status_line, "HTTP/1.1 200 OK");
3410
3411 let headers = read_headers(&mut stream).await.unwrap();
3413 println!("Headers: {headers:?}");
3414 let content_length = headers
3415 .get("content-length")
3416 .unwrap()
3417 .parse::<usize>()
3418 .unwrap();
3419
3420 let body = read_body(&mut stream, content_length).await.unwrap();
3422 assert!(body.contains("test_counter_total 1"));
3423 });
3424
3425 client_handle.await.unwrap();
3427 });
3428 }
3429
3430 #[test]
3431 fn test_tokio_resolver() {
3432 let executor = tokio::Runner::default();
3433 executor.start(|context| async move {
3434 let addrs = context.resolve("localhost").await.unwrap();
3435 assert!(!addrs.is_empty());
3436 for addr in addrs {
3437 assert!(
3438 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
3439 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
3440 );
3441 }
3442 });
3443 }
3444
3445 #[test]
3446 fn test_create_thread_pool_tokio() {
3447 let executor = tokio::Runner::default();
3448 executor.start(|context| async move {
3449 let pool = context
3451 .with_label("pool")
3452 .create_thread_pool(NZUsize!(4))
3453 .unwrap();
3454
3455 let v: Vec<_> = (0..10000).collect();
3457
3458 pool.install(|| {
3460 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3461 });
3462 });
3463 }
3464
3465 #[test]
3466 fn test_create_thread_pool_deterministic() {
3467 let executor = deterministic::Runner::default();
3468 executor.start(|context| async move {
3469 let pool = context
3471 .with_label("pool")
3472 .create_thread_pool(NZUsize!(4))
3473 .unwrap();
3474
3475 let v: Vec<_> = (0..10000).collect();
3477
3478 pool.install(|| {
3480 assert_eq!(v.par_iter().sum::<i32>(), 10000 * 9999 / 2);
3481 });
3482 });
3483 }
3484
3485 fn test_buffer_pooler<R: Runner>(runner: R)
3486 where
3487 R::Context: BufferPooler,
3488 {
3489 runner.start(|context| async move {
3490 let net_buf = context.network_buffer_pool().try_alloc(1024).unwrap();
3492 assert!(net_buf.capacity() >= 1024);
3493
3494 let storage_buf = context.storage_buffer_pool().try_alloc(1024).unwrap();
3496 assert!(storage_buf.capacity() >= 4096);
3497
3498 assert_eq!(
3500 context.network_buffer_pool().config().max_per_class.get(),
3501 4096
3502 );
3503 assert_eq!(
3504 context.storage_buffer_pool().config().max_per_class.get(),
3505 32
3506 );
3507 });
3508 }
3509
3510 #[test]
3511 fn test_deterministic_buffer_pooler() {
3512 let runner = deterministic::Runner::default();
3513 test_buffer_pooler(runner);
3514 }
3515
3516 #[test]
3517 fn test_tokio_buffer_pooler() {
3518 let runner = tokio::Runner::default();
3519 test_buffer_pooler(runner);
3520 }
3521}