1#![doc(
21 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22 html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_macros::select;
26use commonware_utils::StableBuf;
27use prometheus_client::registry::Metric;
28use std::{
29 future::Future,
30 io::Error as IoError,
31 net::SocketAddr,
32 time::{Duration, SystemTime},
33};
34use thiserror::Error;
35
36#[macro_use]
37mod macros;
38
39pub mod deterministic;
40pub mod mocks;
41cfg_if::cfg_if! {
42 if #[cfg(not(target_arch = "wasm32"))] {
43 pub mod tokio;
44 pub mod benchmarks;
45 }
46}
47mod network;
48mod process;
49mod storage;
50pub mod telemetry;
51mod utils;
52pub use utils::*;
53#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
54mod iouring;
55
56const METRICS_PREFIX: &str = "runtime";
58
59#[derive(Error, Debug)]
61pub enum Error {
62 #[error("exited")]
63 Exited,
64 #[error("closed")]
65 Closed,
66 #[error("timeout")]
67 Timeout,
68 #[error("bind failed")]
69 BindFailed,
70 #[error("connection failed")]
71 ConnectionFailed,
72 #[error("write failed")]
73 WriteFailed,
74 #[error("read failed")]
75 ReadFailed,
76 #[error("send failed")]
77 SendFailed,
78 #[error("recv failed")]
79 RecvFailed,
80 #[error("partition creation failed: {0}")]
81 PartitionCreationFailed(String),
82 #[error("partition missing: {0}")]
83 PartitionMissing(String),
84 #[error("partition corrupt: {0}")]
85 PartitionCorrupt(String),
86 #[error("blob open failed: {0}/{1} error: {2}")]
87 BlobOpenFailed(String, String, IoError),
88 #[error("blob missing: {0}/{1}")]
89 BlobMissing(String, String),
90 #[error("blob resize failed: {0}/{1} error: {2}")]
91 BlobResizeFailed(String, String, IoError),
92 #[error("blob sync failed: {0}/{1} error: {2}")]
93 BlobSyncFailed(String, String, IoError),
94 #[error("blob insufficient length")]
95 BlobInsufficientLength,
96 #[error("offset overflow")]
97 OffsetOverflow,
98 #[error("io error: {0}")]
99 Io(#[from] IoError),
100}
101
102pub trait Runner {
105 type Context;
111
112 fn start<F, Fut>(self, f: F) -> Fut::Output
118 where
119 F: FnOnce(Self::Context) -> Fut,
120 Fut: Future;
121}
122
123pub trait Spawner: Clone + Send + Sync + 'static {
125 fn shared(self, blocking: bool) -> Self;
134
135 fn dedicated(self) -> Self;
142
143 fn instrumented(self) -> Self;
145
146 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
180 where
181 F: FnOnce(Self) -> Fut + Send + 'static,
182 Fut: Future<Output = T> + Send + 'static,
183 T: Send + 'static;
184
185 fn stop(
205 self,
206 value: i32,
207 timeout: Option<Duration>,
208 ) -> impl Future<Output = Result<(), Error>> + Send;
209
210 fn stopped(&self) -> signal::Signal;
217}
218
219pub trait Metrics: Clone + Send + Sync + 'static {
221 fn label(&self) -> String;
223
224 fn with_label(&self, label: &str) -> Self;
232
233 fn scoped_label(&self, label: &str) -> String {
237 let label = if self.label().is_empty() {
238 label.to_string()
239 } else {
240 format!("{}_{}", self.label(), label)
241 };
242 assert!(
243 !label.starts_with(METRICS_PREFIX),
244 "using runtime label is not allowed"
245 );
246 label
247 }
248
249 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
253
254 fn encode(&self) -> String;
256}
257
258pub trait Clock: Clone + Send + Sync + 'static {
264 fn current(&self) -> SystemTime;
266
267 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
269
270 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
272
273 fn timeout<F, T>(
294 &self,
295 duration: Duration,
296 future: F,
297 ) -> impl Future<Output = Result<T, Error>> + Send + '_
298 where
299 F: Future<Output = T> + Send + 'static,
300 T: Send + 'static,
301 {
302 async move {
303 select! {
304 result = future => {
305 Ok(result)
306 },
307 _ = self.sleep(duration) => {
308 Err(Error::Timeout)
309 },
310 }
311 }
312 }
313}
314
315cfg_if::cfg_if! {
316 if #[cfg(feature = "external")] {
317 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
319 fn pace<'a, F, T>(
339 &'a self,
340 latency: Duration,
341 future: F,
342 ) -> impl Future<Output = T> + Send + 'a
343 where
344 F: Future<Output = T> + Send + 'a,
345 T: Send + 'a;
346 }
347
348 pub trait FutureExt: Future + Send + Sized {
353 fn pace<'a, E>(
355 self,
356 pacer: &'a E,
357 latency: Duration,
358 ) -> impl Future<Output = Self::Output> + Send + 'a
359 where
360 E: Pacer + 'a,
361 Self: Send + 'a,
362 Self::Output: Send + 'a,
363 {
364 pacer.pace(latency, self)
365 }
366 }
367
368 impl<F> FutureExt for F where F: Future + Send {}
369 }
370}
371
372pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
374
375pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
377
378pub type ListenerOf<N> = <N as crate::Network>::Listener;
380
381pub trait Network: Clone + Send + Sync + 'static {
384 type Listener: Listener;
388
389 fn bind(
391 &self,
392 socket: SocketAddr,
393 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
394
395 fn dial(
397 &self,
398 socket: SocketAddr,
399 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
400}
401
402pub trait Listener: Sync + Send + 'static {
405 type Sink: Sink;
408 type Stream: Stream;
411
412 fn accept(
414 &mut self,
415 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
416
417 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
419}
420
421pub trait Sink: Sync + Send + 'static {
424 fn send(
426 &mut self,
427 msg: impl Into<StableBuf> + Send,
428 ) -> impl Future<Output = Result<(), Error>> + Send;
429}
430
431pub trait Stream: Sync + Send + 'static {
434 fn recv(
437 &mut self,
438 buf: impl Into<StableBuf> + Send,
439 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
440}
441
442pub trait Storage: Clone + Send + Sync + 'static {
450 type Blob: Blob;
452
453 fn open(
461 &self,
462 partition: &str,
463 name: &[u8],
464 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
465
466 fn remove(
472 &self,
473 partition: &str,
474 name: Option<&[u8]>,
475 ) -> impl Future<Output = Result<(), Error>> + Send;
476
477 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
479}
480
481#[allow(clippy::len_without_is_empty)]
496pub trait Blob: Clone + Send + Sync + 'static {
497 fn read_at(
502 &self,
503 buf: impl Into<StableBuf> + Send,
504 offset: u64,
505 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
506
507 fn write_at(
509 &self,
510 buf: impl Into<StableBuf> + Send,
511 offset: u64,
512 ) -> impl Future<Output = Result<(), Error>> + Send;
513
514 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
519
520 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use crate::telemetry::traces::collector::TraceStorage;
528 use bytes::Bytes;
529 use commonware_macros::{select, test_collect_traces};
530 use futures::{
531 channel::{mpsc, oneshot},
532 future::{pending, ready},
533 join, pin_mut, FutureExt, SinkExt, StreamExt,
534 };
535 use prometheus_client::metrics::counter::Counter;
536 use std::{
537 collections::HashMap,
538 pin::Pin,
539 str::FromStr,
540 sync::{
541 atomic::{AtomicU32, Ordering},
542 Arc, Mutex,
543 },
544 task::{Context as TContext, Poll, Waker},
545 };
546 use tracing::{error, Level};
547 use utils::reschedule;
548
549 fn test_error_future<R: Runner>(runner: R) {
550 async fn error_future() -> Result<&'static str, &'static str> {
551 Err("An error occurred")
552 }
553 let result = runner.start(|_| error_future());
554 assert_eq!(result, Err("An error occurred"));
555 }
556
557 fn test_clock_sleep<R: Runner>(runner: R)
558 where
559 R::Context: Spawner + Clock,
560 {
561 runner.start(|context| async move {
562 let start = context.current();
564 let sleep_duration = Duration::from_millis(10);
565 context.sleep(sleep_duration).await;
566
567 let end = context.current();
569 assert!(end.duration_since(start).unwrap() >= sleep_duration);
570 });
571 }
572
573 fn test_clock_sleep_until<R: Runner>(runner: R)
574 where
575 R::Context: Spawner + Clock + Metrics,
576 {
577 runner.start(|context| async move {
578 let now = context.current();
580 context.sleep_until(now + Duration::from_millis(100)).await;
581
582 let elapsed = now.elapsed().unwrap();
584 assert!(elapsed >= Duration::from_millis(100));
585 });
586 }
587
588 fn test_clock_timeout<R: Runner>(runner: R)
589 where
590 R::Context: Spawner + Clock,
591 {
592 runner.start(|context| async move {
593 let result = context
595 .timeout(Duration::from_millis(100), async { "success" })
596 .await;
597 assert_eq!(result.unwrap(), "success");
598
599 let result = context
601 .timeout(Duration::from_millis(50), pending::<()>())
602 .await;
603 assert!(matches!(result, Err(Error::Timeout)));
604
605 let result = context
607 .timeout(
608 Duration::from_millis(100),
609 context.sleep(Duration::from_millis(50)),
610 )
611 .await;
612 assert!(result.is_ok());
613 });
614 }
615
616 fn test_root_finishes<R: Runner>(runner: R)
617 where
618 R::Context: Spawner,
619 {
620 runner.start(|context| async move {
621 context.spawn(|_| async move {
622 loop {
623 reschedule().await;
624 }
625 });
626 });
627 }
628
629 fn test_spawn_after_abort<R>(runner: R)
630 where
631 R: Runner,
632 R::Context: Spawner + Clone,
633 {
634 runner.start(|context| async move {
635 let child = context.clone();
637
638 let parent_handle = context.spawn(move |_| async move {
640 pending::<()>().await;
641 });
642 parent_handle.abort();
643
644 let child_handle = child.spawn(move |_| async move {
646 pending::<()>().await;
647 });
648 assert!(matches!(child_handle.await, Err(Error::Closed)));
649 });
650 }
651
652 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
653 where
654 R::Context: Spawner,
655 {
656 runner.start(|context| async move {
657 let context = if dedicated {
658 assert!(!blocking);
659 context.dedicated()
660 } else {
661 context.shared(blocking)
662 };
663
664 let handle = context.spawn(|_| async move {
665 loop {
666 reschedule().await;
667 }
668 });
669 handle.abort();
670 assert!(matches!(handle.await, Err(Error::Closed)));
671 });
672 }
673
674 fn test_panic_aborts_root<R: Runner>(runner: R) {
675 let result: Result<(), Error> = runner.start(|_| async move {
676 panic!("blah");
677 });
678 result.unwrap_err();
679 }
680
681 fn test_panic_aborts_spawn<R: Runner>(runner: R)
682 where
683 R::Context: Spawner + Clock,
684 {
685 runner.start(|context| async move {
686 context.clone().spawn(|_| async move {
687 panic!("blah");
688 });
689
690 loop {
692 context.sleep(Duration::from_millis(100)).await;
693 }
694 });
695 }
696
697 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
698 where
699 R::Context: Spawner + Clock,
700 {
701 let result: Result<(), Error> = runner.start(|context| async move {
702 let result = context.clone().spawn(|_| async move {
703 panic!("blah");
704 });
705 result.await
706 });
707 assert!(matches!(result, Err(Error::Exited)));
708 }
709
710 fn test_multiple_panics<R: Runner>(runner: R)
711 where
712 R::Context: Spawner + Clock,
713 {
714 runner.start(|context| async move {
715 context.clone().spawn(|_| async move {
716 panic!("boom 1");
717 });
718 context.clone().spawn(|_| async move {
719 panic!("boom 2");
720 });
721 context.clone().spawn(|_| async move {
722 panic!("boom 3");
723 });
724
725 loop {
727 context.sleep(Duration::from_millis(100)).await;
728 }
729 });
730 }
731
732 fn test_multiple_panics_caught<R: Runner>(runner: R)
733 where
734 R::Context: Spawner + Clock,
735 {
736 let (res1, res2, res3) = runner.start(|context| async move {
737 let handle1 = context.clone().spawn(|_| async move {
738 panic!("boom 1");
739 });
740 let handle2 = context.clone().spawn(|_| async move {
741 panic!("boom 2");
742 });
743 let handle3 = context.clone().spawn(|_| async move {
744 panic!("boom 3");
745 });
746
747 join!(handle1, handle2, handle3)
748 });
749 assert!(matches!(res1, Err(Error::Exited)));
750 assert!(matches!(res2, Err(Error::Exited)));
751 assert!(matches!(res3, Err(Error::Exited)));
752 }
753
754 fn test_select<R: Runner>(runner: R) {
755 runner.start(|_| async move {
756 let output = Mutex::new(0);
758 select! {
759 v1 = ready(1) => {
760 *output.lock().unwrap() = v1;
761 },
762 v2 = ready(2) => {
763 *output.lock().unwrap() = v2;
764 },
765 };
766 assert_eq!(*output.lock().unwrap(), 1);
767
768 select! {
770 v1 = std::future::pending::<i32>() => {
771 *output.lock().unwrap() = v1;
772 },
773 v2 = ready(2) => {
774 *output.lock().unwrap() = v2;
775 },
776 };
777 assert_eq!(*output.lock().unwrap(), 2);
778 });
779 }
780
781 fn test_select_loop<R: Runner>(runner: R)
783 where
784 R::Context: Clock,
785 {
786 runner.start(|context| async move {
787 let (mut sender, mut receiver) = mpsc::unbounded();
789 for _ in 0..2 {
790 select! {
791 v = receiver.next() => {
792 panic!("unexpected value: {v:?}");
793 },
794 _ = context.sleep(Duration::from_millis(100)) => {
795 continue;
796 },
797 };
798 }
799
800 sender.send(0).await.unwrap();
802 sender.send(1).await.unwrap();
803
804 select! {
806 _ = async {} => {
807 },
809 v = receiver.next() => {
810 panic!("unexpected value: {v:?}");
811 },
812 };
813
814 for i in 0..2 {
816 select! {
817 _ = context.sleep(Duration::from_millis(100)) => {
818 panic!("timeout");
819 },
820 v = receiver.next() => {
821 assert_eq!(v.unwrap(), i);
822 },
823 };
824 }
825 });
826 }
827
828 fn test_storage_operations<R: Runner>(runner: R)
829 where
830 R::Context: Storage,
831 {
832 runner.start(|context| async move {
833 let partition = "test_partition";
834 let name = b"test_blob";
835
836 let (blob, _) = context
838 .open(partition, name)
839 .await
840 .expect("Failed to open blob");
841
842 let data = b"Hello, Storage!";
844 blob.write_at(Vec::from(data), 0)
845 .await
846 .expect("Failed to write to blob");
847
848 blob.sync().await.expect("Failed to sync blob");
850
851 let read = blob
853 .read_at(vec![0; data.len()], 0)
854 .await
855 .expect("Failed to read from blob");
856 assert_eq!(read.as_ref(), data);
857
858 blob.sync().await.expect("Failed to sync blob");
860
861 let blobs = context
863 .scan(partition)
864 .await
865 .expect("Failed to scan partition");
866 assert!(blobs.contains(&name.to_vec()));
867
868 let (blob, len) = context
870 .open(partition, name)
871 .await
872 .expect("Failed to reopen blob");
873 assert_eq!(len, data.len() as u64);
874
875 let read = blob
877 .read_at(vec![0u8; 7], 7)
878 .await
879 .expect("Failed to read data");
880 assert_eq!(read.as_ref(), b"Storage");
881
882 blob.sync().await.expect("Failed to sync blob");
884
885 context
887 .remove(partition, Some(name))
888 .await
889 .expect("Failed to remove blob");
890
891 let blobs = context
893 .scan(partition)
894 .await
895 .expect("Failed to scan partition");
896 assert!(!blobs.contains(&name.to_vec()));
897
898 context
900 .remove(partition, None)
901 .await
902 .expect("Failed to remove partition");
903
904 let result = context.scan(partition).await;
906 assert!(matches!(result, Err(Error::PartitionMissing(_))));
907 });
908 }
909
910 fn test_blob_read_write<R: Runner>(runner: R)
911 where
912 R::Context: Storage,
913 {
914 runner.start(|context| async move {
915 let partition = "test_partition";
916 let name = b"test_blob_rw";
917
918 let (blob, _) = context
920 .open(partition, name)
921 .await
922 .expect("Failed to open blob");
923
924 let data1 = b"Hello";
926 let data2 = b"World";
927 blob.write_at(Vec::from(data1), 0)
928 .await
929 .expect("Failed to write data1");
930 blob.write_at(Vec::from(data2), 5)
931 .await
932 .expect("Failed to write data2");
933
934 let read = blob
936 .read_at(vec![0u8; 10], 0)
937 .await
938 .expect("Failed to read data");
939 assert_eq!(&read.as_ref()[..5], data1);
940 assert_eq!(&read.as_ref()[5..], data2);
941
942 let result = blob.read_at(vec![0u8; 10], 10).await;
944 assert!(result.is_err());
945
946 let data3 = b"Store";
948 blob.write_at(Vec::from(data3), 5)
949 .await
950 .expect("Failed to write data3");
951
952 let read = blob
954 .read_at(vec![0u8; 10], 0)
955 .await
956 .expect("Failed to read data");
957 assert_eq!(&read.as_ref()[..5], data1);
958 assert_eq!(&read.as_ref()[5..], data3);
959
960 let result = blob.read_at(vec![0u8; 10], 10).await;
962 assert!(result.is_err());
963 });
964 }
965
966 fn test_blob_resize<R: Runner>(runner: R)
967 where
968 R::Context: Storage,
969 {
970 runner.start(|context| async move {
971 let partition = "test_partition_resize";
972 let name = b"test_blob_resize";
973
974 let (blob, _) = context
976 .open(partition, name)
977 .await
978 .expect("Failed to open blob");
979
980 let data = b"some data";
981 blob.write_at(data.to_vec(), 0)
982 .await
983 .expect("Failed to write");
984 blob.sync().await.expect("Failed to sync after write");
985
986 let (blob, len) = context.open(partition, name).await.unwrap();
988 assert_eq!(len, data.len() as u64);
989
990 let new_len = (data.len() as u64) * 2;
992 blob.resize(new_len)
993 .await
994 .expect("Failed to resize to extend");
995 blob.sync().await.expect("Failed to sync after resize");
996
997 let (blob, len) = context.open(partition, name).await.unwrap();
999 assert_eq!(len, new_len);
1000
1001 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1003 assert_eq!(read_buf.as_ref(), data);
1004
1005 let extended_part = blob
1007 .read_at(vec![0; data.len()], data.len() as u64)
1008 .await
1009 .unwrap();
1010 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1011
1012 blob.resize(data.len() as u64).await.unwrap();
1014 blob.sync().await.unwrap();
1015
1016 let (blob, size) = context.open(partition, name).await.unwrap();
1018 assert_eq!(size, data.len() as u64);
1019
1020 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1022 assert_eq!(read_buf.as_ref(), data);
1023 blob.sync().await.unwrap();
1024 });
1025 }
1026
1027 fn test_many_partition_read_write<R: Runner>(runner: R)
1028 where
1029 R::Context: Storage,
1030 {
1031 runner.start(|context| async move {
1032 let partitions = ["partition1", "partition2", "partition3"];
1033 let name = b"test_blob_rw";
1034 let data1 = b"Hello";
1035 let data2 = b"World";
1036
1037 for (additional, partition) in partitions.iter().enumerate() {
1038 let (blob, _) = context
1040 .open(partition, name)
1041 .await
1042 .expect("Failed to open blob");
1043
1044 blob.write_at(Vec::from(data1), 0)
1046 .await
1047 .expect("Failed to write data1");
1048 blob.write_at(Vec::from(data2), 5 + additional as u64)
1049 .await
1050 .expect("Failed to write data2");
1051
1052 blob.sync().await.expect("Failed to sync blob");
1054 }
1055
1056 for (additional, partition) in partitions.iter().enumerate() {
1057 let (blob, len) = context
1059 .open(partition, name)
1060 .await
1061 .expect("Failed to open blob");
1062 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1063
1064 let read = blob
1066 .read_at(vec![0u8; 10 + additional], 0)
1067 .await
1068 .expect("Failed to read data");
1069 assert_eq!(&read.as_ref()[..5], b"Hello");
1070 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1071 }
1072 });
1073 }
1074
1075 fn test_blob_read_past_length<R: Runner>(runner: R)
1076 where
1077 R::Context: Storage,
1078 {
1079 runner.start(|context| async move {
1080 let partition = "test_partition";
1081 let name = b"test_blob_rw";
1082
1083 let (blob, _) = context
1085 .open(partition, name)
1086 .await
1087 .expect("Failed to open blob");
1088
1089 let result = blob.read_at(vec![0u8; 10], 0).await;
1091 assert!(result.is_err());
1092
1093 let data = b"Hello, Storage!".to_vec();
1095 blob.write_at(data, 0)
1096 .await
1097 .expect("Failed to write to blob");
1098
1099 let result = blob.read_at(vec![0u8; 20], 0).await;
1101 assert!(result.is_err());
1102 })
1103 }
1104
1105 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1106 where
1107 R::Context: Spawner + Storage + Metrics,
1108 {
1109 runner.start(|context| async move {
1110 let partition = "test_partition";
1111 let name = b"test_blob_rw";
1112
1113 let (blob, _) = context
1115 .open(partition, name)
1116 .await
1117 .expect("Failed to open blob");
1118
1119 let data = b"Hello, Storage!";
1121 blob.write_at(Vec::from(data), 0)
1122 .await
1123 .expect("Failed to write to blob");
1124
1125 blob.sync().await.expect("Failed to sync blob");
1127
1128 let check1 = context.with_label("check1").spawn({
1130 let blob = blob.clone();
1131 move |_| async move {
1132 let read = blob
1133 .read_at(vec![0u8; data.len()], 0)
1134 .await
1135 .expect("Failed to read from blob");
1136 assert_eq!(read.as_ref(), data);
1137 }
1138 });
1139 let check2 = context.with_label("check2").spawn({
1140 let blob = blob.clone();
1141 move |_| async move {
1142 let read = blob
1143 .read_at(vec![0; data.len()], 0)
1144 .await
1145 .expect("Failed to read from blob");
1146 assert_eq!(read.as_ref(), data);
1147 }
1148 });
1149
1150 let result = join!(check1, check2);
1152 assert!(result.0.is_ok());
1153 assert!(result.1.is_ok());
1154
1155 let read = blob
1157 .read_at(vec![0; data.len()], 0)
1158 .await
1159 .expect("Failed to read from blob");
1160 assert_eq!(read.as_ref(), data);
1161
1162 drop(blob);
1164
1165 let buffer = context.encode();
1167 assert!(buffer.contains("open_blobs 0"));
1168 });
1169 }
1170
1171 fn test_shutdown<R: Runner>(runner: R)
1172 where
1173 R::Context: Spawner + Metrics + Clock,
1174 {
1175 let kill = 9;
1176 runner.start(|context| async move {
1177 let before = context
1179 .with_label("before")
1180 .spawn(move |context| async move {
1181 let mut signal = context.stopped();
1182 let value = (&mut signal).await.unwrap();
1183 assert_eq!(value, kill);
1184 drop(signal);
1185 });
1186
1187 let result = context.clone().stop(kill, None).await;
1189 assert!(result.is_ok());
1190
1191 let after = context
1193 .with_label("after")
1194 .spawn(move |context| async move {
1195 let value = context.stopped().await.unwrap();
1197 assert_eq!(value, kill);
1198 });
1199
1200 let result = join!(before, after);
1202 assert!(result.0.is_ok());
1203 assert!(result.1.is_ok());
1204 });
1205 }
1206
1207 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1208 where
1209 R::Context: Spawner + Metrics + Clock,
1210 {
1211 let kill = 42;
1212 runner.start(|context| async move {
1213 let (started_tx, mut started_rx) = mpsc::channel(3);
1214 let counter = Arc::new(AtomicU32::new(0));
1215
1216 let task = |cleanup_duration: Duration| {
1219 let context = context.clone();
1220 let counter = counter.clone();
1221 let mut started_tx = started_tx.clone();
1222 context.spawn(move |context| async move {
1223 let mut signal = context.stopped();
1225 started_tx.send(()).await.unwrap();
1226
1227 let value = (&mut signal).await.unwrap();
1229 assert_eq!(value, kill);
1230 context.sleep(cleanup_duration).await;
1231 counter.fetch_add(1, Ordering::SeqCst);
1232
1233 drop(signal);
1235 })
1236 };
1237
1238 let task1 = task(Duration::from_millis(10));
1239 let task2 = task(Duration::from_millis(20));
1240 let task3 = task(Duration::from_millis(30));
1241
1242 for _ in 0..3 {
1244 started_rx.next().await.unwrap();
1245 }
1246
1247 context.stop(kill, None).await.unwrap();
1249 assert_eq!(counter.load(Ordering::SeqCst), 3);
1250
1251 let result = join!(task1, task2, task3);
1253 assert!(result.0.is_ok());
1254 assert!(result.1.is_ok());
1255 assert!(result.2.is_ok());
1256 });
1257 }
1258
1259 fn test_shutdown_timeout<R: Runner>(runner: R)
1260 where
1261 R::Context: Spawner + Metrics + Clock,
1262 {
1263 let kill = 42;
1264 runner.start(|context| async move {
1265 let (started_tx, started_rx) = oneshot::channel();
1267
1268 context.clone().spawn(move |context| async move {
1270 let signal = context.stopped();
1271 started_tx.send(()).unwrap();
1272 pending::<()>().await;
1273 signal.await.unwrap();
1274 });
1275
1276 started_rx.await.unwrap();
1278 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1279
1280 assert!(matches!(result, Err(Error::Timeout)));
1282 });
1283 }
1284
1285 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1286 where
1287 R::Context: Spawner + Metrics + Clock,
1288 {
1289 let kill1 = 42;
1290 let kill2 = 43;
1291
1292 runner.start(|context| async move {
1293 let (started_tx, started_rx) = oneshot::channel();
1294 let counter = Arc::new(AtomicU32::new(0));
1295
1296 let task = context.with_label("blocking_task").spawn({
1298 let counter = counter.clone();
1299 move |context| async move {
1300 let mut signal = context.stopped();
1302 started_tx.send(()).unwrap();
1303
1304 let value = (&mut signal).await.unwrap();
1306 assert_eq!(value, kill1);
1307 context.sleep(Duration::from_millis(50)).await;
1308
1309 counter.fetch_add(1, Ordering::SeqCst);
1311 drop(signal);
1312 }
1313 });
1314
1315 started_rx.await.unwrap();
1317
1318 let stop_task1 = context.clone().stop(kill1, None);
1321 pin_mut!(stop_task1);
1322 let stop_task2 = context.clone().stop(kill2, None);
1323 pin_mut!(stop_task2);
1324
1325 assert!(stop_task1.as_mut().now_or_never().is_none());
1327 assert!(stop_task2.as_mut().now_or_never().is_none());
1328
1329 assert!(stop_task1.await.is_ok());
1331 assert!(stop_task2.await.is_ok());
1332
1333 let sig = context.stopped().await;
1335 assert_eq!(sig.unwrap(), kill1);
1336
1337 let result = task.await;
1339 assert!(result.is_ok());
1340 assert_eq!(counter.load(Ordering::SeqCst), 1);
1341
1342 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1344 });
1345 }
1346
1347 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1348 where
1349 R::Context: Spawner + Metrics,
1350 {
1351 runner.start(|context| async move {
1352 context
1354 .with_label("before")
1355 .spawn(move |context| async move {
1356 let mut signal = context.stopped();
1357 let value = (&mut signal).await.unwrap();
1358
1359 assert_eq!(value, 42);
1361 drop(signal);
1362 });
1363
1364 reschedule().await;
1366 });
1367 }
1368
1369 fn test_spawn_dedicated<R: Runner>(runner: R)
1370 where
1371 R::Context: Spawner,
1372 {
1373 runner.start(|context| async move {
1374 let handle = context.dedicated().spawn(|_| async move { 42 });
1375 assert!(matches!(handle.await, Ok(42)));
1376 });
1377 }
1378
1379 fn test_spawn<R: Runner>(runner: R)
1380 where
1381 R::Context: Spawner + Clock,
1382 {
1383 runner.start(|context| async move {
1384 let child_handle = Arc::new(Mutex::new(None));
1385 let child_handle2 = child_handle.clone();
1386
1387 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1388 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1389 let parent_handle = context.spawn(move |context| async move {
1390 let handle = context.spawn(|_| async {});
1392
1393 *child_handle2.lock().unwrap() = Some(handle);
1395
1396 parent_initialized_tx.send(()).unwrap();
1397
1398 parent_complete_rx.await.unwrap();
1400 });
1401
1402 parent_initialized_rx.await.unwrap();
1404
1405 let child_handle = child_handle.lock().unwrap().take().unwrap();
1407 assert!(child_handle.await.is_ok());
1408
1409 parent_complete_tx.send(()).unwrap();
1411
1412 assert!(parent_handle.await.is_ok());
1414 });
1415 }
1416
1417 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1418 where
1419 R::Context: Spawner + Clock,
1420 {
1421 runner.start(|context| async move {
1422 let child_handle = Arc::new(Mutex::new(None));
1423 let child_handle2 = child_handle.clone();
1424
1425 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1426 let parent_handle = context.spawn(move |context| async move {
1427 let handle = context.spawn(|_| pending::<()>());
1429
1430 *child_handle2.lock().unwrap() = Some(handle);
1432
1433 parent_initialized_tx.send(()).unwrap();
1434
1435 pending::<()>().await
1437 });
1438
1439 parent_initialized_rx.await.unwrap();
1441
1442 parent_handle.abort();
1444 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1445
1446 let child_handle = child_handle.lock().unwrap().take().unwrap();
1448 assert!(matches!(child_handle.await, Err(Error::Closed)));
1449 });
1450 }
1451
1452 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1453 where
1454 R::Context: Spawner + Clock,
1455 {
1456 runner.start(|context| async move {
1457 let child_handle = Arc::new(Mutex::new(None));
1458 let child_handle2 = child_handle.clone();
1459
1460 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1461 let parent_handle = context.spawn(move |context| async move {
1462 let handle = context.spawn(|_| pending::<()>());
1464
1465 *child_handle2.lock().unwrap() = Some(handle);
1467
1468 parent_complete_rx.await.unwrap();
1470 });
1471
1472 parent_complete_tx.send(()).unwrap();
1474
1475 assert!(parent_handle.await.is_ok());
1477
1478 let child_handle = child_handle.lock().unwrap().take().unwrap();
1480 assert!(matches!(child_handle.await, Err(Error::Closed)));
1481 });
1482 }
1483
1484 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1485 where
1486 R::Context: Spawner + Clock,
1487 {
1488 runner.start(|context| async move {
1489 let c0 = context.clone();
1499 let g0 = c0.clone();
1500 let g1 = c0.clone();
1501 let c1 = context.clone();
1502 let g2 = c1.clone();
1503 let g3 = c1.clone();
1504 let c2 = context.clone();
1505 let g4 = c2.clone();
1506 let g5 = c2.clone();
1507
1508 let handles = Arc::new(Mutex::new(Vec::new()));
1510 let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1511 let root_task = context.spawn({
1512 let handles = handles.clone();
1513 move |_| async move {
1514 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1515 {
1516 let handle = context.spawn({
1517 let handles = handles.clone();
1518 let mut initialized_tx = initialized_tx.clone();
1519 move |_| async move {
1520 for grandchild in grandchildren {
1521 let handle = grandchild.spawn(|_| async {
1522 pending::<()>().await;
1523 });
1524 handles.lock().unwrap().push(handle);
1525 initialized_tx.send(()).await.unwrap();
1526 }
1527
1528 pending::<()>().await;
1529 }
1530 });
1531 handles.lock().unwrap().push(handle);
1532 initialized_tx.send(()).await.unwrap();
1533 }
1534
1535 pending::<()>().await;
1536 }
1537 });
1538
1539 for _ in 0..9 {
1541 initialized_rx.next().await.unwrap();
1542 }
1543
1544 assert_eq!(handles.lock().unwrap().len(), 9);
1546
1547 root_task.abort();
1549 assert!(matches!(root_task.await, Err(Error::Closed)));
1550
1551 let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1553 for handle in handles {
1554 assert!(matches!(handle.await, Err(Error::Closed)));
1555 }
1556 });
1557 }
1558
1559 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1560 where
1561 R::Context: Spawner + Clock,
1562 {
1563 runner.start(|context| async move {
1564 let (child_started_tx, child_started_rx) = oneshot::channel();
1565 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1566 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1567 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1568 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1569 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1570 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1571
1572 let parent = context.spawn(move |context| async move {
1573 let child_handle = context.clone().spawn(|_| async move {
1575 child_started_tx.send(()).unwrap();
1576 child_complete_rx.await.unwrap();
1578 });
1579 assert!(
1580 child_handle_tx.send(child_handle).is_ok(),
1581 "child handle receiver dropped"
1582 );
1583
1584 let sibling_handle = context.clone().spawn(move |_| async move {
1586 sibling_started_tx.send(()).unwrap();
1587 sibling_complete_rx.await.unwrap();
1589 });
1590 assert!(
1591 sibling_handle_tx.send(sibling_handle).is_ok(),
1592 "sibling handle receiver dropped"
1593 );
1594
1595 parent_complete_rx.await.unwrap();
1597 });
1598
1599 child_started_rx.await.unwrap();
1601 sibling_started_rx.await.unwrap();
1602
1603 sibling_complete_tx.send(()).unwrap();
1605 assert!(sibling_handle_rx.await.is_ok());
1606
1607 child_complete_tx.send(()).unwrap();
1609 assert!(child_handle_rx.await.is_ok());
1610
1611 parent_complete_tx.send(()).unwrap();
1613 assert!(parent.await.is_ok());
1614 });
1615 }
1616
1617 fn test_spawn_clone_chain<R: Runner>(runner: R)
1618 where
1619 R::Context: Spawner + Clock,
1620 {
1621 runner.start(|context| async move {
1622 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1623 let (child_started_tx, child_started_rx) = oneshot::channel();
1624 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1625 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1626 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1627
1628 let parent = context.clone().spawn({
1629 move |context| async move {
1630 let child = context.clone().spawn({
1631 move |context| async move {
1632 let grandchild = context.clone().spawn({
1633 move |_| async move {
1634 grandchild_started_tx.send(()).unwrap();
1635 pending::<()>().await;
1636 }
1637 });
1638 assert!(
1639 grandchild_handle_tx.send(grandchild).is_ok(),
1640 "grandchild handle receiver dropped"
1641 );
1642 child_started_tx.send(()).unwrap();
1643 pending::<()>().await;
1644 }
1645 });
1646 assert!(
1647 child_handle_tx.send(child).is_ok(),
1648 "child handle receiver dropped"
1649 );
1650 parent_started_tx.send(()).unwrap();
1651 pending::<()>().await;
1652 }
1653 });
1654
1655 parent_started_rx.await.unwrap();
1656 child_started_rx.await.unwrap();
1657 grandchild_started_rx.await.unwrap();
1658
1659 let child_handle = child_handle_rx.await.unwrap();
1660 let grandchild_handle = grandchild_handle_rx.await.unwrap();
1661
1662 parent.abort();
1663 assert!(parent.await.is_err());
1664
1665 assert!(child_handle.await.is_err());
1666 assert!(grandchild_handle.await.is_err());
1667 });
1668 }
1669
1670 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1671 where
1672 R::Context: Spawner + Clock,
1673 {
1674 runner.start(|context| async move {
1675 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1676 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1677
1678 let parent = context.clone().spawn({
1679 move |context| async move {
1680 let clone1 = context.clone();
1681 let clone2 = clone1.clone();
1682 let clone3 = clone2.clone();
1683
1684 let leaf = clone3.clone().spawn({
1685 move |_| async move {
1686 leaf_started_tx.send(()).unwrap();
1687 pending::<()>().await;
1688 }
1689 });
1690
1691 leaf_handle_tx
1692 .send(leaf)
1693 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1694 pending::<()>().await;
1695 }
1696 });
1697
1698 leaf_started_rx.await.unwrap();
1699 let leaf_handle = leaf_handle_rx.await.unwrap();
1700
1701 parent.abort();
1702 assert!(parent.await.is_err());
1703 assert!(leaf_handle.await.is_err());
1704 });
1705 }
1706
1707 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1708 where
1709 R::Context: Spawner,
1710 {
1711 runner.start(|context| async move {
1712 let context = if dedicated {
1713 context.dedicated()
1714 } else {
1715 context.shared(true)
1716 };
1717
1718 let handle = context.spawn(|_| async move { 42 });
1719 let result = handle.await;
1720 assert!(matches!(result, Ok(42)));
1721 });
1722 }
1723
1724 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1725 where
1726 R::Context: Spawner + Clock,
1727 {
1728 runner.start(|context| async move {
1729 let context = if dedicated {
1730 context.dedicated()
1731 } else {
1732 context.shared(true)
1733 };
1734
1735 context.clone().spawn(|_| async move {
1736 panic!("blocking task panicked");
1737 });
1738
1739 loop {
1741 context.sleep(Duration::from_millis(100)).await;
1742 }
1743 });
1744 }
1745
1746 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1747 where
1748 R::Context: Spawner + Clock,
1749 {
1750 let result: Result<(), Error> = runner.start(|context| async move {
1751 let context = if dedicated {
1752 context.dedicated()
1753 } else {
1754 context.shared(true)
1755 };
1756
1757 let handle = context.clone().spawn(|_| async move {
1758 panic!("blocking task panicked");
1759 });
1760 handle.await
1761 });
1762 assert!(matches!(result, Err(Error::Exited)));
1763 }
1764
1765 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1766 runner.start(|_| async move {
1767 let dropper = Arc::new(());
1769 let executor = deterministic::Runner::default();
1770 executor.start({
1771 let dropper = dropper.clone();
1772 move |context| async move {
1773 let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1775 let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1776 let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1777
1778 context.with_label("task1").spawn({
1780 let mut setup_tx = setup_tx.clone();
1781 let dropper = dropper.clone();
1782 move |_| async move {
1783 tx2.send(()).await.unwrap();
1785 rx1.next().await.unwrap();
1786 setup_tx.send(()).await.unwrap();
1787
1788 while rx1.next().await.is_some() {}
1790 drop(tx2);
1791 drop(dropper);
1792 }
1793 });
1794
1795 context.with_label("task2").spawn(move |_| async move {
1797 tx1.send(()).await.unwrap();
1799 rx2.next().await.unwrap();
1800 setup_tx.send(()).await.unwrap();
1801
1802 while rx2.next().await.is_some() {}
1804 drop(tx1);
1805 drop(dropper);
1806 });
1807
1808 setup_rx.next().await.unwrap();
1810 setup_rx.next().await.unwrap();
1811 }
1812 });
1813
1814 Arc::try_unwrap(dropper).expect("references remaining");
1816 });
1817 }
1818
1819 fn test_late_waker<R: Runner>(runner: R)
1820 where
1821 R::Context: Metrics + Spawner,
1822 {
1823 struct CaptureWaker {
1826 tx: Option<oneshot::Sender<Waker>>,
1827 sent: bool,
1828 }
1829 impl Future for CaptureWaker {
1830 type Output = ();
1831 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1832 if !self.sent {
1833 if let Some(tx) = self.tx.take() {
1834 let _ = tx.send(cx.waker().clone());
1836 }
1837 self.sent = true;
1838 }
1839 Poll::Pending
1840 }
1841 }
1842
1843 struct WakeOnDrop(Option<Waker>);
1845 impl Drop for WakeOnDrop {
1846 fn drop(&mut self) {
1847 if let Some(w) = self.0.take() {
1848 w.wake_by_ref();
1849 }
1850 }
1851 }
1852
1853 let holder = runner.start(|context| async move {
1855 let (tx, rx) = oneshot::channel::<Waker>();
1857
1858 context
1860 .with_label("capture-waker")
1861 .spawn(move |_| async move {
1862 CaptureWaker {
1863 tx: Some(tx),
1864 sent: false,
1865 }
1866 .await;
1867 });
1868
1869 utils::reschedule().await;
1871
1872 let waker = rx.await.expect("waker not received");
1874
1875 WakeOnDrop(Some(waker))
1877 });
1878
1879 drop(holder);
1882 }
1883
1884 fn test_metrics<R: Runner>(runner: R)
1885 where
1886 R::Context: Metrics,
1887 {
1888 runner.start(|context| async move {
1889 assert_eq!(context.label(), "");
1891
1892 let counter = Counter::<u64>::default();
1894 context.register("test", "test", counter.clone());
1895
1896 counter.inc();
1898
1899 let buffer = context.encode();
1901 assert!(buffer.contains("test_total 1"));
1902
1903 let context = context.with_label("nested");
1905 let nested_counter = Counter::<u64>::default();
1906 context.register("test", "test", nested_counter.clone());
1907
1908 nested_counter.inc();
1910
1911 let buffer = context.encode();
1913 assert!(buffer.contains("nested_test_total 1"));
1914 assert!(buffer.contains("test_total 1"));
1915 });
1916 }
1917
1918 fn test_metrics_label<R: Runner>(runner: R)
1919 where
1920 R::Context: Metrics,
1921 {
1922 runner.start(|context| async move {
1923 context.with_label(METRICS_PREFIX);
1924 })
1925 }
1926
1927 #[test]
1928 fn test_deterministic_future() {
1929 let runner = deterministic::Runner::default();
1930 test_error_future(runner);
1931 }
1932
1933 #[test]
1934 fn test_deterministic_clock_sleep() {
1935 let executor = deterministic::Runner::default();
1936 test_clock_sleep(executor);
1937 }
1938
1939 #[test]
1940 fn test_deterministic_clock_sleep_until() {
1941 let executor = deterministic::Runner::default();
1942 test_clock_sleep_until(executor);
1943 }
1944
1945 #[test]
1946 fn test_deterministic_clock_timeout() {
1947 let executor = deterministic::Runner::default();
1948 test_clock_timeout(executor);
1949 }
1950
1951 #[test]
1952 fn test_deterministic_root_finishes() {
1953 let executor = deterministic::Runner::default();
1954 test_root_finishes(executor);
1955 }
1956
1957 #[test]
1958 fn test_deterministic_spawn_after_abort() {
1959 let executor = deterministic::Runner::default();
1960 test_spawn_after_abort(executor);
1961 }
1962
1963 #[test]
1964 fn test_deterministic_spawn_abort() {
1965 let executor = deterministic::Runner::default();
1966 test_spawn_abort(executor, false, false);
1967 }
1968
1969 #[test]
1970 #[should_panic(expected = "blah")]
1971 fn test_deterministic_panic_aborts_root() {
1972 let runner = deterministic::Runner::default();
1973 test_panic_aborts_root(runner);
1974 }
1975
1976 #[test]
1977 #[should_panic(expected = "blah")]
1978 fn test_deterministic_panic_aborts_root_caught() {
1979 let cfg = deterministic::Config::default().with_catch_panics(true);
1980 let runner = deterministic::Runner::new(cfg);
1981 test_panic_aborts_root(runner);
1982 }
1983
1984 #[test]
1985 #[should_panic(expected = "blah")]
1986 fn test_deterministic_panic_aborts_spawn() {
1987 let executor = deterministic::Runner::default();
1988 test_panic_aborts_spawn(executor);
1989 }
1990
1991 #[test]
1992 fn test_deterministic_panic_aborts_spawn_caught() {
1993 let cfg = deterministic::Config::default().with_catch_panics(true);
1994 let executor = deterministic::Runner::new(cfg);
1995 test_panic_aborts_spawn_caught(executor);
1996 }
1997
1998 #[test]
1999 #[should_panic(expected = "boom")]
2000 fn test_deterministic_multiple_panics() {
2001 let executor = deterministic::Runner::default();
2002 test_multiple_panics(executor);
2003 }
2004
2005 #[test]
2006 fn test_deterministic_multiple_panics_caught() {
2007 let cfg = deterministic::Config::default().with_catch_panics(true);
2008 let executor = deterministic::Runner::new(cfg);
2009 test_multiple_panics_caught(executor);
2010 }
2011
2012 #[test]
2013 fn test_deterministic_select() {
2014 let executor = deterministic::Runner::default();
2015 test_select(executor);
2016 }
2017
2018 #[test]
2019 fn test_deterministic_select_loop() {
2020 let executor = deterministic::Runner::default();
2021 test_select_loop(executor);
2022 }
2023
2024 #[test]
2025 fn test_deterministic_storage_operations() {
2026 let executor = deterministic::Runner::default();
2027 test_storage_operations(executor);
2028 }
2029
2030 #[test]
2031 fn test_deterministic_blob_read_write() {
2032 let executor = deterministic::Runner::default();
2033 test_blob_read_write(executor);
2034 }
2035
2036 #[test]
2037 fn test_deterministic_blob_resize() {
2038 let executor = deterministic::Runner::default();
2039 test_blob_resize(executor);
2040 }
2041
2042 #[test]
2043 fn test_deterministic_many_partition_read_write() {
2044 let executor = deterministic::Runner::default();
2045 test_many_partition_read_write(executor);
2046 }
2047
2048 #[test]
2049 fn test_deterministic_blob_read_past_length() {
2050 let executor = deterministic::Runner::default();
2051 test_blob_read_past_length(executor);
2052 }
2053
2054 #[test]
2055 fn test_deterministic_blob_clone_and_concurrent_read() {
2056 let executor = deterministic::Runner::default();
2058 test_blob_clone_and_concurrent_read(executor);
2059 }
2060
2061 #[test]
2062 fn test_deterministic_shutdown() {
2063 let executor = deterministic::Runner::default();
2064 test_shutdown(executor);
2065 }
2066
2067 #[test]
2068 fn test_deterministic_shutdown_multiple_signals() {
2069 let executor = deterministic::Runner::default();
2070 test_shutdown_multiple_signals(executor);
2071 }
2072
2073 #[test]
2074 fn test_deterministic_shutdown_timeout() {
2075 let executor = deterministic::Runner::default();
2076 test_shutdown_timeout(executor);
2077 }
2078
2079 #[test]
2080 fn test_deterministic_shutdown_multiple_stop_calls() {
2081 let executor = deterministic::Runner::default();
2082 test_shutdown_multiple_stop_calls(executor);
2083 }
2084
2085 #[test]
2086 fn test_deterministic_unfulfilled_shutdown() {
2087 let executor = deterministic::Runner::default();
2088 test_unfulfilled_shutdown(executor);
2089 }
2090
2091 #[test]
2092 fn test_deterministic_spawn_dedicated() {
2093 let executor = deterministic::Runner::default();
2094 test_spawn_dedicated(executor);
2095 }
2096
2097 #[test]
2098 fn test_deterministic_spawn() {
2099 let runner = deterministic::Runner::default();
2100 test_spawn(runner);
2101 }
2102
2103 #[test]
2104 fn test_deterministic_spawn_abort_on_parent_abort() {
2105 let runner = deterministic::Runner::default();
2106 test_spawn_abort_on_parent_abort(runner);
2107 }
2108
2109 #[test]
2110 fn test_deterministic_spawn_abort_on_parent_completion() {
2111 let runner = deterministic::Runner::default();
2112 test_spawn_abort_on_parent_completion(runner);
2113 }
2114
2115 #[test]
2116 fn test_deterministic_spawn_cascading_abort() {
2117 let runner = deterministic::Runner::default();
2118 test_spawn_cascading_abort(runner);
2119 }
2120
2121 #[test]
2122 fn test_deterministic_child_survives_sibling_completion() {
2123 let runner = deterministic::Runner::default();
2124 test_child_survives_sibling_completion(runner);
2125 }
2126
2127 #[test]
2128 fn test_deterministic_spawn_clone_chain() {
2129 let runner = deterministic::Runner::default();
2130 test_spawn_clone_chain(runner);
2131 }
2132
2133 #[test]
2134 fn test_deterministic_spawn_sparse_clone_chain() {
2135 let runner = deterministic::Runner::default();
2136 test_spawn_sparse_clone_chain(runner);
2137 }
2138
2139 #[test]
2140 fn test_deterministic_spawn_blocking() {
2141 for dedicated in [false, true] {
2142 let executor = deterministic::Runner::default();
2143 test_spawn_blocking(executor, dedicated);
2144 }
2145 }
2146
2147 #[test]
2148 #[should_panic(expected = "blocking task panicked")]
2149 fn test_deterministic_spawn_blocking_panic() {
2150 for dedicated in [false, true] {
2151 let executor = deterministic::Runner::default();
2152 test_spawn_blocking_panic(executor, dedicated);
2153 }
2154 }
2155
2156 #[test]
2157 fn test_deterministic_spawn_blocking_panic_caught() {
2158 for dedicated in [false, true] {
2159 let cfg = deterministic::Config::default().with_catch_panics(true);
2160 let executor = deterministic::Runner::new(cfg);
2161 test_spawn_blocking_panic_caught(executor, dedicated);
2162 }
2163 }
2164
2165 #[test]
2166 fn test_deterministic_spawn_blocking_abort() {
2167 for (dedicated, blocking) in [(false, true), (true, false)] {
2168 let executor = deterministic::Runner::default();
2169 test_spawn_abort(executor, dedicated, blocking);
2170 }
2171 }
2172
2173 #[test]
2174 fn test_deterministic_circular_reference_prevents_cleanup() {
2175 let executor = deterministic::Runner::default();
2176 test_circular_reference_prevents_cleanup(executor);
2177 }
2178
2179 #[test]
2180 fn test_deterministic_late_waker() {
2181 let executor = deterministic::Runner::default();
2182 test_late_waker(executor);
2183 }
2184
2185 #[test]
2186 fn test_deterministic_metrics() {
2187 let executor = deterministic::Runner::default();
2188 test_metrics(executor);
2189 }
2190
2191 #[test]
2192 #[should_panic]
2193 fn test_deterministic_metrics_label() {
2194 let executor = deterministic::Runner::default();
2195 test_metrics_label(executor);
2196 }
2197
2198 #[test]
2199 fn test_tokio_error_future() {
2200 let runner = tokio::Runner::default();
2201 test_error_future(runner);
2202 }
2203
2204 #[test]
2205 fn test_tokio_clock_sleep() {
2206 let executor = tokio::Runner::default();
2207 test_clock_sleep(executor);
2208 }
2209
2210 #[test]
2211 fn test_tokio_clock_sleep_until() {
2212 let executor = tokio::Runner::default();
2213 test_clock_sleep_until(executor);
2214 }
2215
2216 #[test]
2217 fn test_tokio_clock_timeout() {
2218 let executor = tokio::Runner::default();
2219 test_clock_timeout(executor);
2220 }
2221
2222 #[test]
2223 fn test_tokio_root_finishes() {
2224 let executor = tokio::Runner::default();
2225 test_root_finishes(executor);
2226 }
2227
2228 #[test]
2229 fn test_tokio_spawn_after_abort() {
2230 let executor = tokio::Runner::default();
2231 test_spawn_after_abort(executor);
2232 }
2233
2234 #[test]
2235 fn test_tokio_spawn_abort() {
2236 let executor = tokio::Runner::default();
2237 test_spawn_abort(executor, false, false);
2238 }
2239
2240 #[test]
2241 #[should_panic(expected = "blah")]
2242 fn test_tokio_panic_aborts_root() {
2243 let executor = tokio::Runner::default();
2244 test_panic_aborts_root(executor);
2245 }
2246
2247 #[test]
2248 #[should_panic(expected = "blah")]
2249 fn test_tokio_panic_aborts_root_caught() {
2250 let cfg = tokio::Config::default().with_catch_panics(true);
2251 let executor = tokio::Runner::new(cfg);
2252 test_panic_aborts_root(executor);
2253 }
2254
2255 #[test]
2256 #[should_panic(expected = "blah")]
2257 fn test_tokio_panic_aborts_spawn() {
2258 let executor = tokio::Runner::default();
2259 test_panic_aborts_spawn(executor);
2260 }
2261
2262 #[test]
2263 fn test_tokio_panic_aborts_spawn_caught() {
2264 let cfg = tokio::Config::default().with_catch_panics(true);
2265 let executor = tokio::Runner::new(cfg);
2266 test_panic_aborts_spawn_caught(executor);
2267 }
2268
2269 #[test]
2270 #[should_panic(expected = "boom")]
2271 fn test_tokio_multiple_panics() {
2272 let executor = tokio::Runner::default();
2273 test_multiple_panics(executor);
2274 }
2275
2276 #[test]
2277 fn test_tokio_multiple_panics_caught() {
2278 let cfg = tokio::Config::default().with_catch_panics(true);
2279 let executor = tokio::Runner::new(cfg);
2280 test_multiple_panics_caught(executor);
2281 }
2282
2283 #[test]
2284 fn test_tokio_select() {
2285 let executor = tokio::Runner::default();
2286 test_select(executor);
2287 }
2288
2289 #[test]
2290 fn test_tokio_select_loop() {
2291 let executor = tokio::Runner::default();
2292 test_select_loop(executor);
2293 }
2294
2295 #[test]
2296 fn test_tokio_storage_operations() {
2297 let executor = tokio::Runner::default();
2298 test_storage_operations(executor);
2299 }
2300
2301 #[test]
2302 fn test_tokio_blob_read_write() {
2303 let executor = tokio::Runner::default();
2304 test_blob_read_write(executor);
2305 }
2306
2307 #[test]
2308 fn test_tokio_blob_resize() {
2309 let executor = tokio::Runner::default();
2310 test_blob_resize(executor);
2311 }
2312
2313 #[test]
2314 fn test_tokio_many_partition_read_write() {
2315 let executor = tokio::Runner::default();
2316 test_many_partition_read_write(executor);
2317 }
2318
2319 #[test]
2320 fn test_tokio_blob_read_past_length() {
2321 let executor = tokio::Runner::default();
2322 test_blob_read_past_length(executor);
2323 }
2324
2325 #[test]
2326 fn test_tokio_blob_clone_and_concurrent_read() {
2327 let executor = tokio::Runner::default();
2329 test_blob_clone_and_concurrent_read(executor);
2330 }
2331
2332 #[test]
2333 fn test_tokio_shutdown() {
2334 let executor = tokio::Runner::default();
2335 test_shutdown(executor);
2336 }
2337
2338 #[test]
2339 fn test_tokio_shutdown_multiple_signals() {
2340 let executor = tokio::Runner::default();
2341 test_shutdown_multiple_signals(executor);
2342 }
2343
2344 #[test]
2345 fn test_tokio_shutdown_timeout() {
2346 let executor = tokio::Runner::default();
2347 test_shutdown_timeout(executor);
2348 }
2349
2350 #[test]
2351 fn test_tokio_shutdown_multiple_stop_calls() {
2352 let executor = tokio::Runner::default();
2353 test_shutdown_multiple_stop_calls(executor);
2354 }
2355
2356 #[test]
2357 fn test_tokio_unfulfilled_shutdown() {
2358 let executor = tokio::Runner::default();
2359 test_unfulfilled_shutdown(executor);
2360 }
2361
2362 #[test]
2363 fn test_tokio_spawn_dedicated() {
2364 let executor = tokio::Runner::default();
2365 test_spawn_dedicated(executor);
2366 }
2367
2368 #[test]
2369 fn test_tokio_spawn() {
2370 let runner = tokio::Runner::default();
2371 test_spawn(runner);
2372 }
2373
2374 #[test]
2375 fn test_tokio_spawn_abort_on_parent_abort() {
2376 let runner = tokio::Runner::default();
2377 test_spawn_abort_on_parent_abort(runner);
2378 }
2379
2380 #[test]
2381 fn test_tokio_spawn_abort_on_parent_completion() {
2382 let runner = tokio::Runner::default();
2383 test_spawn_abort_on_parent_completion(runner);
2384 }
2385
2386 #[test]
2387 fn test_tokio_spawn_cascading_abort() {
2388 let runner = tokio::Runner::default();
2389 test_spawn_cascading_abort(runner);
2390 }
2391
2392 #[test]
2393 fn test_tokio_child_survives_sibling_completion() {
2394 let runner = tokio::Runner::default();
2395 test_child_survives_sibling_completion(runner);
2396 }
2397
2398 #[test]
2399 fn test_tokio_spawn_clone_chain() {
2400 let runner = tokio::Runner::default();
2401 test_spawn_clone_chain(runner);
2402 }
2403
2404 #[test]
2405 fn test_tokio_spawn_sparse_clone_chain() {
2406 let runner = tokio::Runner::default();
2407 test_spawn_sparse_clone_chain(runner);
2408 }
2409
2410 #[test]
2411 fn test_tokio_spawn_blocking() {
2412 for dedicated in [false, true] {
2413 let executor = tokio::Runner::default();
2414 test_spawn_blocking(executor, dedicated);
2415 }
2416 }
2417
2418 #[test]
2419 #[should_panic(expected = "blocking task panicked")]
2420 fn test_tokio_spawn_blocking_panic() {
2421 for dedicated in [false, true] {
2422 let executor = tokio::Runner::default();
2423 test_spawn_blocking_panic(executor, dedicated);
2424 }
2425 }
2426
2427 #[test]
2428 fn test_tokio_spawn_blocking_panic_caught() {
2429 for dedicated in [false, true] {
2430 let cfg = tokio::Config::default().with_catch_panics(true);
2431 let executor = tokio::Runner::new(cfg);
2432 test_spawn_blocking_panic_caught(executor, dedicated);
2433 }
2434 }
2435
2436 #[test]
2437 fn test_tokio_spawn_blocking_abort() {
2438 for (dedicated, blocking) in [(false, true), (true, false)] {
2439 let executor = tokio::Runner::default();
2440 test_spawn_abort(executor, dedicated, blocking);
2441 }
2442 }
2443
2444 #[test]
2445 fn test_tokio_circular_reference_prevents_cleanup() {
2446 let executor = tokio::Runner::default();
2447 test_circular_reference_prevents_cleanup(executor);
2448 }
2449
2450 #[test]
2451 fn test_tokio_late_waker() {
2452 let executor = tokio::Runner::default();
2453 test_late_waker(executor);
2454 }
2455
2456 #[test]
2457 fn test_tokio_metrics() {
2458 let executor = tokio::Runner::default();
2459 test_metrics(executor);
2460 }
2461
2462 #[test]
2463 #[should_panic]
2464 fn test_tokio_metrics_label() {
2465 let executor = tokio::Runner::default();
2466 test_metrics_label(executor);
2467 }
2468
2469 #[test]
2470 fn test_tokio_process_rss_metric() {
2471 let executor = tokio::Runner::default();
2472 executor.start(|context| async move {
2473 loop {
2474 let metrics = context.encode();
2476 if !metrics.contains("runtime_process_rss") {
2477 context.sleep(Duration::from_millis(100)).await;
2478 continue;
2479 }
2480
2481 for line in metrics.lines() {
2483 if line.starts_with("runtime_process_rss")
2484 && !line.starts_with("runtime_process_rss{")
2485 {
2486 let parts: Vec<&str> = line.split_whitespace().collect();
2487 if parts.len() >= 2 {
2488 let rss_value: i64 =
2489 parts[1].parse().expect("Failed to parse RSS value");
2490 if rss_value > 0 {
2491 return;
2492 }
2493 }
2494 }
2495 }
2496 }
2497 });
2498 }
2499
2500 #[test]
2501 fn test_tokio_telemetry() {
2502 let executor = tokio::Runner::default();
2503 executor.start(|context| async move {
2504 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2506
2507 tokio::telemetry::init(
2509 context.with_label("metrics"),
2510 tokio::telemetry::Logging {
2511 level: Level::INFO,
2512 json: false,
2513 },
2514 Some(address),
2515 None,
2516 );
2517
2518 let counter: Counter<u64> = Counter::default();
2520 context.register("test_counter", "Test counter", counter.clone());
2521 counter.inc();
2522
2523 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2525 let mut line = Vec::new();
2526 loop {
2527 let byte = stream.recv(vec![0; 1]).await?;
2528 if byte[0] == b'\n' {
2529 if line.last() == Some(&b'\r') {
2530 line.pop(); }
2532 break;
2533 }
2534 line.push(byte[0]);
2535 }
2536 String::from_utf8(line).map_err(|_| Error::ReadFailed)
2537 }
2538
2539 async fn read_headers<St: Stream>(
2540 stream: &mut St,
2541 ) -> Result<HashMap<String, String>, Error> {
2542 let mut headers = HashMap::new();
2543 loop {
2544 let line = read_line(stream).await?;
2545 if line.is_empty() {
2546 break;
2547 }
2548 let parts: Vec<&str> = line.splitn(2, ": ").collect();
2549 if parts.len() == 2 {
2550 headers.insert(parts[0].to_string(), parts[1].to_string());
2551 }
2552 }
2553 Ok(headers)
2554 }
2555
2556 async fn read_body<St: Stream>(
2557 stream: &mut St,
2558 content_length: usize,
2559 ) -> Result<String, Error> {
2560 let read = stream.recv(vec![0; content_length]).await?;
2561 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2562 }
2563
2564 let client_handle = context
2566 .with_label("client")
2567 .spawn(move |context| async move {
2568 let (mut sink, mut stream) = loop {
2569 match context.dial(address).await {
2570 Ok((sink, stream)) => break (sink, stream),
2571 Err(e) => {
2572 error!(err =?e, "failed to connect");
2574 context.sleep(Duration::from_millis(10)).await;
2575 }
2576 }
2577 };
2578
2579 let request = format!(
2581 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2582 );
2583 sink.send(Bytes::from(request).to_vec()).await.unwrap();
2584
2585 let status_line = read_line(&mut stream).await.unwrap();
2587 assert_eq!(status_line, "HTTP/1.1 200 OK");
2588
2589 let headers = read_headers(&mut stream).await.unwrap();
2591 println!("Headers: {headers:?}");
2592 let content_length = headers
2593 .get("content-length")
2594 .unwrap()
2595 .parse::<usize>()
2596 .unwrap();
2597
2598 let body = read_body(&mut stream, content_length).await.unwrap();
2600 assert!(body.contains("test_counter_total 1"));
2601 });
2602
2603 client_handle.await.unwrap();
2605 });
2606 }
2607
2608 #[test_collect_traces]
2609 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2610 let executor = deterministic::Runner::new(deterministic::Config::default());
2611 executor.start(|context| async move {
2612 context
2613 .with_label("test")
2614 .instrumented()
2615 .spawn(|context| async move {
2616 tracing::info!(field = "test field", "test log");
2617
2618 context
2619 .with_label("inner")
2620 .instrumented()
2621 .spawn(|_| async move {
2622 tracing::info!("inner log");
2623 })
2624 .await
2625 .unwrap();
2626 })
2627 .await
2628 .unwrap();
2629 });
2630
2631 let info_traces = traces.get_by_level(Level::INFO);
2632 assert_eq!(info_traces.len(), 2);
2633
2634 info_traces
2636 .expect_event_at_index(0, |event| {
2637 event.metadata.expect_content_exact("test log")?;
2638 event.metadata.expect_field_count(1)?;
2639 event.metadata.expect_field_exact("field", "test field")?;
2640 event.expect_span_count(1)?;
2641 event.expect_span_at_index(0, |span| {
2642 span.expect_content_exact("task")?;
2643 span.expect_field_count(1)?;
2644 span.expect_field_exact("name", "test")
2645 })
2646 })
2647 .unwrap();
2648
2649 info_traces
2650 .expect_event_at_index(1, |event| {
2651 event.metadata.expect_content_exact("inner log")?;
2652 event.metadata.expect_field_count(0)?;
2653 event.expect_span_count(1)?;
2654 event.expect_span_at_index(0, |span| {
2655 span.expect_content_exact("task")?;
2656 span.expect_field_count(1)?;
2657 span.expect_field_exact("name", "test_inner")
2658 })
2659 })
2660 .unwrap();
2661 }
2662}