1#![doc(
21 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22 html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_utils::StableBuf;
26use prometheus_client::registry::Metric;
27use std::{
28 future::Future,
29 io::Error as IoError,
30 net::SocketAddr,
31 time::{Duration, SystemTime},
32};
33use thiserror::Error;
34
35#[macro_use]
36mod macros;
37
38pub mod deterministic;
39pub mod mocks;
40cfg_if::cfg_if! {
41 if #[cfg(not(target_arch = "wasm32"))] {
42 pub mod tokio;
43 pub mod benchmarks;
44 }
45}
46mod network;
47mod process;
48mod storage;
49pub mod telemetry;
50mod utils;
51pub use utils::*;
52#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
53mod iouring;
54
55const METRICS_PREFIX: &str = "runtime";
57
58#[derive(Error, Debug)]
60pub enum Error {
61 #[error("exited")]
62 Exited,
63 #[error("closed")]
64 Closed,
65 #[error("timeout")]
66 Timeout,
67 #[error("bind failed")]
68 BindFailed,
69 #[error("connection failed")]
70 ConnectionFailed,
71 #[error("write failed")]
72 WriteFailed,
73 #[error("read failed")]
74 ReadFailed,
75 #[error("send failed")]
76 SendFailed,
77 #[error("recv failed")]
78 RecvFailed,
79 #[error("partition creation failed: {0}")]
80 PartitionCreationFailed(String),
81 #[error("partition missing: {0}")]
82 PartitionMissing(String),
83 #[error("partition corrupt: {0}")]
84 PartitionCorrupt(String),
85 #[error("blob open failed: {0}/{1} error: {2}")]
86 BlobOpenFailed(String, String, IoError),
87 #[error("blob missing: {0}/{1}")]
88 BlobMissing(String, String),
89 #[error("blob resize failed: {0}/{1} error: {2}")]
90 BlobResizeFailed(String, String, IoError),
91 #[error("blob sync failed: {0}/{1} error: {2}")]
92 BlobSyncFailed(String, String, IoError),
93 #[error("blob insufficient length")]
94 BlobInsufficientLength,
95 #[error("offset overflow")]
96 OffsetOverflow,
97 #[error("io error: {0}")]
98 Io(#[from] IoError),
99}
100
101pub trait Runner {
104 type Context;
110
111 fn start<F, Fut>(self, f: F) -> Fut::Output
113 where
114 F: FnOnce(Self::Context) -> Fut,
115 Fut: Future;
116}
117
118pub trait Spawner: Clone + Send + Sync + 'static {
120 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
128 where
129 F: FnOnce(Self) -> Fut + Send + 'static,
130 Fut: Future<Output = T> + Send + 'static,
131 T: Send + 'static;
132
133 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
142 where
143 F: Future<Output = T> + Send + 'static,
144 T: Send + 'static;
145
146 fn spawn_child<F, Fut, T>(self, f: F) -> Handle<T>
166 where
167 F: FnOnce(Self) -> Fut + Send + 'static,
168 Fut: Future<Output = T> + Send + 'static,
169 T: Send + 'static;
170
171 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
190 where
191 F: FnOnce(Self) -> T + Send + 'static,
192 T: Send + 'static;
193
194 fn spawn_blocking_ref<F, T>(
203 &mut self,
204 dedicated: bool,
205 ) -> impl FnOnce(F) -> Handle<T> + 'static
206 where
207 F: FnOnce() -> T + Send + 'static,
208 T: Send + 'static;
209
210 fn stop(
230 self,
231 value: i32,
232 timeout: Option<Duration>,
233 ) -> impl Future<Output = Result<(), Error>> + Send;
234
235 fn stopped(&self) -> signal::Signal;
242}
243
244pub trait Metrics: Clone + Send + Sync + 'static {
246 fn label(&self) -> String;
248
249 fn with_label(&self, label: &str) -> Self;
257
258 fn scoped_label(&self, label: &str) -> String {
262 let label = if self.label().is_empty() {
263 label.to_string()
264 } else {
265 format!("{}_{}", self.label(), label)
266 };
267 assert!(
268 !label.starts_with(METRICS_PREFIX),
269 "using runtime label is not allowed"
270 );
271 label
272 }
273
274 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
278
279 fn encode(&self) -> String;
281}
282
283pub trait Clock: Clone + Send + Sync + 'static {
289 fn current(&self) -> SystemTime;
291
292 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
294
295 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
297}
298
299pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
301
302pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
304
305pub type ListenerOf<N> = <N as crate::Network>::Listener;
307
308pub trait Network: Clone + Send + Sync + 'static {
311 type Listener: Listener;
315
316 fn bind(
318 &self,
319 socket: SocketAddr,
320 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
321
322 fn dial(
324 &self,
325 socket: SocketAddr,
326 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
327}
328
329pub trait Listener: Sync + Send + 'static {
332 type Sink: Sink;
335 type Stream: Stream;
338
339 fn accept(
341 &mut self,
342 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
343
344 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
346}
347
348pub trait Sink: Sync + Send + 'static {
351 fn send(
353 &mut self,
354 msg: impl Into<StableBuf> + Send,
355 ) -> impl Future<Output = Result<(), Error>> + Send;
356}
357
358pub trait Stream: Sync + Send + 'static {
361 fn recv(
364 &mut self,
365 buf: impl Into<StableBuf> + Send,
366 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
367}
368
369pub trait Storage: Clone + Send + Sync + 'static {
377 type Blob: Blob;
379
380 fn open(
386 &self,
387 partition: &str,
388 name: &[u8],
389 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
390
391 fn remove(
395 &self,
396 partition: &str,
397 name: Option<&[u8]>,
398 ) -> impl Future<Output = Result<(), Error>> + Send;
399
400 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
402}
403
404#[allow(clippy::len_without_is_empty)]
419pub trait Blob: Clone + Send + Sync + 'static {
420 fn read_at(
425 &self,
426 buf: impl Into<StableBuf> + Send,
427 offset: u64,
428 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
429
430 fn write_at(
432 &self,
433 buf: impl Into<StableBuf> + Send,
434 offset: u64,
435 ) -> impl Future<Output = Result<(), Error>> + Send;
436
437 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
442
443 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use bytes::Bytes;
451 use commonware_macros::select;
452 use futures::{
453 channel::{mpsc, oneshot},
454 future::{pending, ready},
455 join, pin_mut, FutureExt, SinkExt, StreamExt,
456 };
457 use prometheus_client::metrics::counter::Counter;
458 use std::{
459 collections::HashMap,
460 panic::{catch_unwind, AssertUnwindSafe},
461 str::FromStr,
462 sync::{
463 atomic::{AtomicU32, Ordering},
464 Arc, Mutex,
465 },
466 };
467 use tracing::{error, Level};
468 use utils::reschedule;
469
470 fn test_error_future<R: Runner>(runner: R) {
471 async fn error_future() -> Result<&'static str, &'static str> {
472 Err("An error occurred")
473 }
474 let result = runner.start(|_| error_future());
475 assert_eq!(result, Err("An error occurred"));
476 }
477
478 fn test_clock_sleep<R: Runner>(runner: R)
479 where
480 R::Context: Spawner + Clock,
481 {
482 runner.start(|context| async move {
483 let start = context.current();
485 let sleep_duration = Duration::from_millis(10);
486 context.sleep(sleep_duration).await;
487
488 let end = context.current();
490 assert!(end.duration_since(start).unwrap() >= sleep_duration);
491 });
492 }
493
494 fn test_clock_sleep_until<R: Runner>(runner: R)
495 where
496 R::Context: Spawner + Clock,
497 {
498 runner.start(|context| async move {
499 let now = context.current();
501 context.sleep_until(now + Duration::from_millis(100)).await;
502
503 let elapsed = now.elapsed().unwrap();
505 assert!(elapsed >= Duration::from_millis(100));
506 });
507 }
508
509 fn test_root_finishes<R: Runner>(runner: R)
510 where
511 R::Context: Spawner,
512 {
513 runner.start(|context| async move {
514 context.spawn(|_| async move {
515 loop {
516 reschedule().await;
517 }
518 });
519 });
520 }
521
522 fn test_spawn_abort<R: Runner>(runner: R)
523 where
524 R::Context: Spawner,
525 {
526 runner.start(|context| async move {
527 let handle = context.spawn(|_| async move {
528 loop {
529 reschedule().await;
530 }
531 });
532 handle.abort();
533 assert!(matches!(handle.await, Err(Error::Closed)));
534 });
535 }
536
537 fn test_panic_aborts_root<R: Runner>(runner: R) {
538 let result = catch_unwind(AssertUnwindSafe(|| {
539 runner.start(|_| async move {
540 panic!("blah");
541 });
542 }));
543 result.unwrap_err();
544 }
545
546 fn test_panic_aborts_spawn<R: Runner>(runner: R)
547 where
548 R::Context: Spawner,
549 {
550 let result = runner.start(|context| async move {
551 let result = context.spawn(|_| async move {
552 panic!("blah");
553 });
554 assert!(matches!(result.await, Err(Error::Exited)));
555 Result::<(), Error>::Ok(())
556 });
557
558 result.unwrap();
560 }
561
562 fn test_select<R: Runner>(runner: R) {
563 runner.start(|_| async move {
564 let output = Mutex::new(0);
566 select! {
567 v1 = ready(1) => {
568 *output.lock().unwrap() = v1;
569 },
570 v2 = ready(2) => {
571 *output.lock().unwrap() = v2;
572 },
573 };
574 assert_eq!(*output.lock().unwrap(), 1);
575
576 select! {
578 v1 = std::future::pending::<i32>() => {
579 *output.lock().unwrap() = v1;
580 },
581 v2 = ready(2) => {
582 *output.lock().unwrap() = v2;
583 },
584 };
585 assert_eq!(*output.lock().unwrap(), 2);
586 });
587 }
588
589 fn test_select_loop<R: Runner>(runner: R)
591 where
592 R::Context: Clock,
593 {
594 runner.start(|context| async move {
595 let (mut sender, mut receiver) = mpsc::unbounded();
597 for _ in 0..2 {
598 select! {
599 v = receiver.next() => {
600 panic!("unexpected value: {v:?}");
601 },
602 _ = context.sleep(Duration::from_millis(100)) => {
603 continue;
604 },
605 };
606 }
607
608 sender.send(0).await.unwrap();
610 sender.send(1).await.unwrap();
611
612 select! {
614 _ = async {} => {
615 },
617 v = receiver.next() => {
618 panic!("unexpected value: {v:?}");
619 },
620 };
621
622 for i in 0..2 {
624 select! {
625 _ = context.sleep(Duration::from_millis(100)) => {
626 panic!("timeout");
627 },
628 v = receiver.next() => {
629 assert_eq!(v.unwrap(), i);
630 },
631 };
632 }
633 });
634 }
635
636 fn test_storage_operations<R: Runner>(runner: R)
637 where
638 R::Context: Storage,
639 {
640 runner.start(|context| async move {
641 let partition = "test_partition";
642 let name = b"test_blob";
643
644 let (blob, _) = context
646 .open(partition, name)
647 .await
648 .expect("Failed to open blob");
649
650 let data = b"Hello, Storage!";
652 blob.write_at(Vec::from(data), 0)
653 .await
654 .expect("Failed to write to blob");
655
656 blob.sync().await.expect("Failed to sync blob");
658
659 let read = blob
661 .read_at(vec![0; data.len()], 0)
662 .await
663 .expect("Failed to read from blob");
664 assert_eq!(read.as_ref(), data);
665
666 blob.sync().await.expect("Failed to sync blob");
668
669 let blobs = context
671 .scan(partition)
672 .await
673 .expect("Failed to scan partition");
674 assert!(blobs.contains(&name.to_vec()));
675
676 let (blob, len) = context
678 .open(partition, name)
679 .await
680 .expect("Failed to reopen blob");
681 assert_eq!(len, data.len() as u64);
682
683 let read = blob
685 .read_at(vec![0u8; 7], 7)
686 .await
687 .expect("Failed to read data");
688 assert_eq!(read.as_ref(), b"Storage");
689
690 blob.sync().await.expect("Failed to sync blob");
692
693 context
695 .remove(partition, Some(name))
696 .await
697 .expect("Failed to remove blob");
698
699 let blobs = context
701 .scan(partition)
702 .await
703 .expect("Failed to scan partition");
704 assert!(!blobs.contains(&name.to_vec()));
705
706 context
708 .remove(partition, None)
709 .await
710 .expect("Failed to remove partition");
711
712 let result = context.scan(partition).await;
714 assert!(matches!(result, Err(Error::PartitionMissing(_))));
715 });
716 }
717
718 fn test_blob_read_write<R: Runner>(runner: R)
719 where
720 R::Context: Storage,
721 {
722 runner.start(|context| async move {
723 let partition = "test_partition";
724 let name = b"test_blob_rw";
725
726 let (blob, _) = context
728 .open(partition, name)
729 .await
730 .expect("Failed to open blob");
731
732 let data1 = b"Hello";
734 let data2 = b"World";
735 blob.write_at(Vec::from(data1), 0)
736 .await
737 .expect("Failed to write data1");
738 blob.write_at(Vec::from(data2), 5)
739 .await
740 .expect("Failed to write data2");
741
742 let read = blob
744 .read_at(vec![0u8; 10], 0)
745 .await
746 .expect("Failed to read data");
747 assert_eq!(&read.as_ref()[..5], data1);
748 assert_eq!(&read.as_ref()[5..], data2);
749
750 let result = blob.read_at(vec![0u8; 10], 10).await;
752 assert!(result.is_err());
753
754 let data3 = b"Store";
756 blob.write_at(Vec::from(data3), 5)
757 .await
758 .expect("Failed to write data3");
759
760 let read = blob
762 .read_at(vec![0u8; 10], 0)
763 .await
764 .expect("Failed to read data");
765 assert_eq!(&read.as_ref()[..5], data1);
766 assert_eq!(&read.as_ref()[5..], data3);
767
768 let result = blob.read_at(vec![0u8; 10], 10).await;
770 assert!(result.is_err());
771 });
772 }
773
774 fn test_blob_resize<R: Runner>(runner: R)
775 where
776 R::Context: Storage,
777 {
778 runner.start(|context| async move {
779 let partition = "test_partition_resize";
780 let name = b"test_blob_resize";
781
782 let (blob, _) = context
784 .open(partition, name)
785 .await
786 .expect("Failed to open blob");
787
788 let data = b"some data";
789 blob.write_at(data.to_vec(), 0)
790 .await
791 .expect("Failed to write");
792 blob.sync().await.expect("Failed to sync after write");
793
794 let (blob, len) = context.open(partition, name).await.unwrap();
796 assert_eq!(len, data.len() as u64);
797
798 let new_len = (data.len() as u64) * 2;
800 blob.resize(new_len)
801 .await
802 .expect("Failed to resize to extend");
803 blob.sync().await.expect("Failed to sync after resize");
804
805 let (blob, len) = context.open(partition, name).await.unwrap();
807 assert_eq!(len, new_len);
808
809 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
811 assert_eq!(read_buf.as_ref(), data);
812
813 let extended_part = blob
815 .read_at(vec![0; data.len()], data.len() as u64)
816 .await
817 .unwrap();
818 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
819
820 blob.resize(data.len() as u64).await.unwrap();
822 blob.sync().await.unwrap();
823
824 let (blob, size) = context.open(partition, name).await.unwrap();
826 assert_eq!(size, data.len() as u64);
827
828 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
830 assert_eq!(read_buf.as_ref(), data);
831 blob.sync().await.unwrap();
832 });
833 }
834
835 fn test_many_partition_read_write<R: Runner>(runner: R)
836 where
837 R::Context: Storage,
838 {
839 runner.start(|context| async move {
840 let partitions = ["partition1", "partition2", "partition3"];
841 let name = b"test_blob_rw";
842 let data1 = b"Hello";
843 let data2 = b"World";
844
845 for (additional, partition) in partitions.iter().enumerate() {
846 let (blob, _) = context
848 .open(partition, name)
849 .await
850 .expect("Failed to open blob");
851
852 blob.write_at(Vec::from(data1), 0)
854 .await
855 .expect("Failed to write data1");
856 blob.write_at(Vec::from(data2), 5 + additional as u64)
857 .await
858 .expect("Failed to write data2");
859
860 blob.sync().await.expect("Failed to sync blob");
862 }
863
864 for (additional, partition) in partitions.iter().enumerate() {
865 let (blob, len) = context
867 .open(partition, name)
868 .await
869 .expect("Failed to open blob");
870 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
871
872 let read = blob
874 .read_at(vec![0u8; 10 + additional], 0)
875 .await
876 .expect("Failed to read data");
877 assert_eq!(&read.as_ref()[..5], b"Hello");
878 assert_eq!(&read.as_ref()[5 + additional..], b"World");
879 }
880 });
881 }
882
883 fn test_blob_read_past_length<R: Runner>(runner: R)
884 where
885 R::Context: Storage,
886 {
887 runner.start(|context| async move {
888 let partition = "test_partition";
889 let name = b"test_blob_rw";
890
891 let (blob, _) = context
893 .open(partition, name)
894 .await
895 .expect("Failed to open blob");
896
897 let result = blob.read_at(vec![0u8; 10], 0).await;
899 assert!(result.is_err());
900
901 let data = b"Hello, Storage!".to_vec();
903 blob.write_at(data, 0)
904 .await
905 .expect("Failed to write to blob");
906
907 let result = blob.read_at(vec![0u8; 20], 0).await;
909 assert!(result.is_err());
910 })
911 }
912
913 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
914 where
915 R::Context: Spawner + Storage + Metrics,
916 {
917 runner.start(|context| async move {
918 let partition = "test_partition";
919 let name = b"test_blob_rw";
920
921 let (blob, _) = context
923 .open(partition, name)
924 .await
925 .expect("Failed to open blob");
926
927 let data = b"Hello, Storage!";
929 blob.write_at(Vec::from(data), 0)
930 .await
931 .expect("Failed to write to blob");
932
933 blob.sync().await.expect("Failed to sync blob");
935
936 let check1 = context.with_label("check1").spawn({
938 let blob = blob.clone();
939 move |_| async move {
940 let read = blob
941 .read_at(vec![0u8; data.len()], 0)
942 .await
943 .expect("Failed to read from blob");
944 assert_eq!(read.as_ref(), data);
945 }
946 });
947 let check2 = context.with_label("check2").spawn({
948 let blob = blob.clone();
949 move |_| async move {
950 let read = blob
951 .read_at(vec![0; data.len()], 0)
952 .await
953 .expect("Failed to read from blob");
954 assert_eq!(read.as_ref(), data);
955 }
956 });
957
958 let result = join!(check1, check2);
960 assert!(result.0.is_ok());
961 assert!(result.1.is_ok());
962
963 let read = blob
965 .read_at(vec![0; data.len()], 0)
966 .await
967 .expect("Failed to read from blob");
968 assert_eq!(read.as_ref(), data);
969
970 drop(blob);
972
973 let buffer = context.encode();
975 assert!(buffer.contains("open_blobs 0"));
976 });
977 }
978
979 fn test_shutdown<R: Runner>(runner: R)
980 where
981 R::Context: Spawner + Metrics + Clock,
982 {
983 let kill = 9;
984 runner.start(|context| async move {
985 let before = context
987 .with_label("before")
988 .spawn(move |context| async move {
989 let mut signal = context.stopped();
990 let value = (&mut signal).await.unwrap();
991 assert_eq!(value, kill);
992 drop(signal);
993 });
994
995 let result = context.clone().stop(kill, None).await;
997 assert!(result.is_ok());
998
999 let after = context
1001 .with_label("after")
1002 .spawn(move |context| async move {
1003 let value = context.stopped().await.unwrap();
1005 assert_eq!(value, kill);
1006 });
1007
1008 let result = join!(before, after);
1010 assert!(result.0.is_ok());
1011 assert!(result.1.is_ok());
1012 });
1013 }
1014
1015 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1016 where
1017 R::Context: Spawner + Metrics + Clock,
1018 {
1019 let kill = 42;
1020 runner.start(|context| async move {
1021 let (started_tx, mut started_rx) = mpsc::channel(3);
1022 let counter = Arc::new(AtomicU32::new(0));
1023
1024 let task = |cleanup_duration: Duration| {
1027 let context = context.clone();
1028 let counter = counter.clone();
1029 let mut started_tx = started_tx.clone();
1030 context.spawn(move |context| async move {
1031 let mut signal = context.stopped();
1033 started_tx.send(()).await.unwrap();
1034
1035 let value = (&mut signal).await.unwrap();
1037 assert_eq!(value, kill);
1038 context.sleep(cleanup_duration).await;
1039 counter.fetch_add(1, Ordering::SeqCst);
1040
1041 drop(signal);
1043 })
1044 };
1045
1046 let task1 = task(Duration::from_millis(10));
1047 let task2 = task(Duration::from_millis(20));
1048 let task3 = task(Duration::from_millis(30));
1049
1050 for _ in 0..3 {
1052 started_rx.next().await.unwrap();
1053 }
1054
1055 context.stop(kill, None).await.unwrap();
1057 assert_eq!(counter.load(Ordering::SeqCst), 3);
1058
1059 let result = join!(task1, task2, task3);
1061 assert!(result.0.is_ok());
1062 assert!(result.1.is_ok());
1063 assert!(result.2.is_ok());
1064 });
1065 }
1066
1067 fn test_shutdown_timeout<R: Runner>(runner: R)
1068 where
1069 R::Context: Spawner + Metrics + Clock,
1070 {
1071 let kill = 42;
1072 runner.start(|context| async move {
1073 let (started_tx, started_rx) = oneshot::channel();
1075
1076 context.clone().spawn(move |context| async move {
1078 let signal = context.stopped();
1079 started_tx.send(()).unwrap();
1080 pending::<()>().await;
1081 signal.await.unwrap();
1082 });
1083
1084 started_rx.await.unwrap();
1086 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1087
1088 assert!(matches!(result, Err(Error::Timeout)));
1090 });
1091 }
1092
1093 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1094 where
1095 R::Context: Spawner + Metrics + Clock,
1096 {
1097 let kill1 = 42;
1098 let kill2 = 43;
1099
1100 runner.start(|context| async move {
1101 let (started_tx, started_rx) = oneshot::channel();
1102 let counter = Arc::new(AtomicU32::new(0));
1103
1104 let task = context.with_label("blocking_task").spawn({
1106 let counter = counter.clone();
1107 move |context| async move {
1108 let mut signal = context.stopped();
1110 started_tx.send(()).unwrap();
1111
1112 let value = (&mut signal).await.unwrap();
1114 assert_eq!(value, kill1);
1115 context.sleep(Duration::from_millis(50)).await;
1116
1117 counter.fetch_add(1, Ordering::SeqCst);
1119 drop(signal);
1120 }
1121 });
1122
1123 started_rx.await.unwrap();
1125
1126 let stop_task1 = context.clone().stop(kill1, None);
1129 pin_mut!(stop_task1);
1130 let stop_task2 = context.clone().stop(kill2, None);
1131 pin_mut!(stop_task2);
1132
1133 assert!(stop_task1.as_mut().now_or_never().is_none());
1135 assert!(stop_task2.as_mut().now_or_never().is_none());
1136
1137 assert!(stop_task1.await.is_ok());
1139 assert!(stop_task2.await.is_ok());
1140
1141 let sig = context.stopped().await;
1143 assert_eq!(sig.unwrap(), kill1);
1144
1145 let result = task.await;
1147 assert!(result.is_ok());
1148 assert_eq!(counter.load(Ordering::SeqCst), 1);
1149
1150 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1152 });
1153 }
1154
1155 fn test_spawn_ref<R: Runner>(runner: R)
1156 where
1157 R::Context: Spawner,
1158 {
1159 runner.start(|mut context| async move {
1160 let handle = context.spawn_ref();
1161 let result = handle(async move { 42 }).await;
1162 assert!(matches!(result, Ok(42)));
1163 });
1164 }
1165
1166 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1167 where
1168 R::Context: Spawner,
1169 {
1170 runner.start(|mut context| async move {
1171 let handle = context.spawn_ref();
1172 let result = handle(async move { 42 }).await;
1173 assert!(matches!(result, Ok(42)));
1174
1175 let handle = context.spawn_ref();
1177 let result = handle(async move { 42 }).await;
1178 assert!(matches!(result, Ok(42)));
1179 });
1180 }
1181
1182 fn test_spawn_duplicate<R: Runner>(runner: R)
1183 where
1184 R::Context: Spawner,
1185 {
1186 runner.start(|mut context| async move {
1187 let handle = context.spawn_ref();
1188 let result = handle(async move { 42 }).await;
1189 assert!(matches!(result, Ok(42)));
1190
1191 context.spawn(|_| async move { 42 });
1193 });
1194 }
1195
1196 fn test_spawn_child<R: Runner>(runner: R)
1197 where
1198 R::Context: Spawner + Clock,
1199 {
1200 runner.start(|context| async move {
1201 let child_handle = Arc::new(Mutex::new(None));
1202 let child_handle2 = child_handle.clone();
1203
1204 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1205 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1206 let parent_handle = context.spawn(move |context| async move {
1207 let handle = context.spawn_child(|_| async {});
1209
1210 *child_handle2.lock().unwrap() = Some(handle);
1212
1213 parent_initialized_tx.send(()).unwrap();
1214
1215 parent_complete_rx.await.unwrap();
1217 });
1218
1219 parent_initialized_rx.await.unwrap();
1221
1222 let child_handle = child_handle.lock().unwrap().take().unwrap();
1224 assert!(child_handle.await.is_ok());
1225
1226 parent_complete_tx.send(()).unwrap();
1228
1229 assert!(parent_handle.await.is_ok());
1231 });
1232 }
1233
1234 fn test_spawn_child_abort_on_parent_abort<R: Runner>(runner: R)
1235 where
1236 R::Context: Spawner + Clock,
1237 {
1238 runner.start(|context| async move {
1239 let child_handle = Arc::new(Mutex::new(None));
1240 let child_handle2 = child_handle.clone();
1241
1242 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1243 let parent_handle = context.spawn(move |context| async move {
1244 let handle = context.spawn_child(|_| pending::<()>());
1246
1247 *child_handle2.lock().unwrap() = Some(handle);
1249
1250 parent_initialized_tx.send(()).unwrap();
1251
1252 pending::<()>().await
1254 });
1255
1256 parent_initialized_rx.await.unwrap();
1258
1259 parent_handle.abort();
1261 assert!(parent_handle.await.is_err());
1262
1263 let child_handle = child_handle.lock().unwrap().take().unwrap();
1265 assert!(child_handle.await.is_err());
1266 });
1267 }
1268
1269 fn test_spawn_child_abort_on_parent_completion<R: Runner>(runner: R)
1270 where
1271 R::Context: Spawner + Clock,
1272 {
1273 runner.start(|context| async move {
1274 let child_handle = Arc::new(Mutex::new(None));
1275 let child_handle2 = child_handle.clone();
1276
1277 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1278 let parent_handle = context.spawn(move |context| async move {
1279 let handle = context.spawn_child(|_| pending::<()>());
1281
1282 *child_handle2.lock().unwrap() = Some(handle);
1284
1285 parent_complete_rx.await.unwrap();
1287 });
1288
1289 parent_complete_tx.send(()).unwrap();
1291
1292 assert!(parent_handle.await.is_ok());
1294
1295 let child_handle = child_handle.lock().unwrap().take().unwrap();
1297 assert!(child_handle.await.is_err());
1298 });
1299 }
1300
1301 fn test_spawn_child_cascading_abort<R: Runner>(runner: R)
1302 where
1303 R::Context: Spawner + Clock,
1304 {
1305 runner.start(|context| async move {
1306 let handles = Arc::new(Mutex::new(Vec::new()));
1317 let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1318 let root_task = {
1319 let handles = handles.clone();
1320 context.spawn(move |context| async move {
1321 for _ in 0..3 {
1322 let handles2 = handles.clone();
1323 let mut initialized_tx2 = initialized_tx.clone();
1324 let handle = context.clone().spawn_child(move |context| async move {
1325 for _ in 0..2 {
1326 let handle = context.clone().spawn_child(|_| async {
1327 pending::<()>().await;
1328 });
1329 handles2.lock().unwrap().push(handle);
1330 initialized_tx2.send(()).await.unwrap();
1331 }
1332 pending::<()>().await;
1333 });
1334
1335 handles.lock().unwrap().push(handle);
1336 initialized_tx.send(()).await.unwrap();
1337 }
1338
1339 pending::<()>().await;
1340 })
1341 };
1342
1343 for _ in 0..9 {
1345 initialized_rx.next().await.unwrap();
1346 }
1347
1348 assert_eq!(handles.lock().unwrap().len(), 9,);
1350
1351 root_task.abort();
1353 assert!(root_task.await.is_err());
1354
1355 let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1357 for handle in handles {
1358 assert!(handle.await.is_err());
1359 }
1360 });
1361 }
1362
1363 fn test_child_survives_sibling_completion<R: Runner>(runner: R, use_spawn_ref: bool)
1364 where
1365 R::Context: Spawner + Clock,
1366 {
1367 runner.start(|context| async move {
1368 let (child_started_tx, child_started_rx) = oneshot::channel();
1369 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1370 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1371 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1372 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1373
1374 let parent = context.spawn(move |mut context| async move {
1375 context.clone().spawn_child(|_| async move {
1377 child_started_tx.send(()).unwrap();
1378 child_complete_rx.await.unwrap();
1380 });
1381
1382 let sibling_task = async move {
1384 sibling_started_tx.send(()).unwrap();
1385 sibling_complete_rx.await.unwrap();
1387 };
1388
1389 if use_spawn_ref {
1390 context.spawn_ref()(sibling_task);
1391 } else {
1392 context.spawn(|_| sibling_task);
1393 }
1394
1395 parent_complete_rx.await.unwrap();
1397 });
1398
1399 child_started_rx.await.unwrap();
1401 sibling_started_rx.await.unwrap();
1402
1403 sibling_complete_tx.send(()).unwrap();
1405
1406 child_complete_tx.send(()).unwrap();
1408
1409 parent_complete_tx.send(()).unwrap();
1411
1412 assert!(parent.await.is_ok());
1413 });
1414 }
1415
1416 fn test_clone_context_no_child_inheritance<R: Runner>(runner: R)
1417 where
1418 R::Context: Spawner + Clock,
1419 {
1420 runner.start(|context| async move {
1421 let (child_started_tx, child_started_rx) = oneshot::channel();
1422 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1423 let (cloned_task_started_tx, cloned_task_started_rx) = oneshot::channel();
1424 let (cloned_task_complete_tx, cloned_task_complete_rx) = oneshot::channel();
1425 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1426
1427 let cloned_context = context.clone();
1429 let parent = cloned_context.spawn(move |context| async move {
1430 context.spawn_child(|_| async move {
1432 child_started_tx.send(()).unwrap();
1433 child_complete_rx.await.unwrap();
1434 });
1435
1436 parent_complete_rx.await.unwrap();
1438 });
1439
1440 context.spawn(move |_| async move {
1443 cloned_task_started_tx.send(()).unwrap();
1444 cloned_task_complete_rx.await.unwrap();
1445 });
1446
1447 child_started_rx.await.unwrap();
1449 cloned_task_started_rx.await.unwrap();
1450
1451 cloned_task_complete_tx.send(()).unwrap();
1454
1455 child_complete_tx.send(()).unwrap();
1457
1458 parent_complete_tx.send(()).unwrap();
1460
1461 assert!(parent.await.is_ok());
1462 });
1463 }
1464
1465 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1466 where
1467 R::Context: Spawner,
1468 {
1469 runner.start(|context| async move {
1470 let handle = context.spawn_blocking(dedicated, |_| 42);
1471 let result = handle.await;
1472 assert!(matches!(result, Ok(42)));
1473 });
1474 }
1475
1476 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1477 where
1478 R::Context: Spawner,
1479 {
1480 runner.start(|mut context| async move {
1481 let spawn = context.spawn_blocking_ref(dedicated);
1482 let handle = spawn(|| 42);
1483 let result = handle.await;
1484 assert!(matches!(result, Ok(42)));
1485 });
1486 }
1487
1488 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1489 where
1490 R::Context: Spawner,
1491 {
1492 runner.start(|mut context| async move {
1493 let spawn = context.spawn_blocking_ref(dedicated);
1494 let result = spawn(|| 42).await;
1495 assert!(matches!(result, Ok(42)));
1496
1497 context.spawn_blocking(dedicated, |_| 42);
1499 });
1500 }
1501
1502 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1503 where
1504 R::Context: Spawner,
1505 {
1506 runner.start(|context| async move {
1507 let (sender, mut receiver) = oneshot::channel();
1509 let handle = context.spawn_blocking(dedicated, move |_| {
1510 loop {
1512 if receiver.try_recv().is_ok() {
1513 break;
1514 }
1515 }
1516
1517 let mut count = 0;
1519 loop {
1520 count += 1;
1521 if count >= 100_000_000 {
1522 break;
1523 }
1524 }
1525 count
1526 });
1527
1528 handle.abort();
1534 sender.send(()).unwrap();
1535
1536 assert!(matches!(handle.await, Ok(100_000_000)));
1538 });
1539 }
1540
1541 fn test_metrics<R: Runner>(runner: R)
1542 where
1543 R::Context: Metrics,
1544 {
1545 runner.start(|context| async move {
1546 assert_eq!(context.label(), "");
1548
1549 let counter = Counter::<u64>::default();
1551 context.register("test", "test", counter.clone());
1552
1553 counter.inc();
1555
1556 let buffer = context.encode();
1558 assert!(buffer.contains("test_total 1"));
1559
1560 let context = context.with_label("nested");
1562 let nested_counter = Counter::<u64>::default();
1563 context.register("test", "test", nested_counter.clone());
1564
1565 nested_counter.inc();
1567
1568 let buffer = context.encode();
1570 assert!(buffer.contains("nested_test_total 1"));
1571 assert!(buffer.contains("test_total 1"));
1572 });
1573 }
1574
1575 fn test_metrics_label<R: Runner>(runner: R)
1576 where
1577 R::Context: Metrics,
1578 {
1579 runner.start(|context| async move {
1580 context.with_label(METRICS_PREFIX);
1581 })
1582 }
1583
1584 #[test]
1585 fn test_deterministic_future() {
1586 let runner = deterministic::Runner::default();
1587 test_error_future(runner);
1588 }
1589
1590 #[test]
1591 fn test_deterministic_clock_sleep() {
1592 let executor = deterministic::Runner::default();
1593 test_clock_sleep(executor);
1594 }
1595
1596 #[test]
1597 fn test_deterministic_clock_sleep_until() {
1598 let executor = deterministic::Runner::default();
1599 test_clock_sleep_until(executor);
1600 }
1601
1602 #[test]
1603 fn test_deterministic_root_finishes() {
1604 let executor = deterministic::Runner::default();
1605 test_root_finishes(executor);
1606 }
1607
1608 #[test]
1609 fn test_deterministic_spawn_abort() {
1610 let executor = deterministic::Runner::default();
1611 test_spawn_abort(executor);
1612 }
1613
1614 #[test]
1615 fn test_deterministic_panic_aborts_root() {
1616 let runner = deterministic::Runner::default();
1617 test_panic_aborts_root(runner);
1618 }
1619
1620 #[test]
1621 #[should_panic(expected = "blah")]
1622 fn test_deterministic_panic_aborts_spawn() {
1623 let executor = deterministic::Runner::default();
1624 test_panic_aborts_spawn(executor);
1625 }
1626
1627 #[test]
1628 fn test_deterministic_select() {
1629 let executor = deterministic::Runner::default();
1630 test_select(executor);
1631 }
1632
1633 #[test]
1634 fn test_deterministic_select_loop() {
1635 let executor = deterministic::Runner::default();
1636 test_select_loop(executor);
1637 }
1638
1639 #[test]
1640 fn test_deterministic_storage_operations() {
1641 let executor = deterministic::Runner::default();
1642 test_storage_operations(executor);
1643 }
1644
1645 #[test]
1646 fn test_deterministic_blob_read_write() {
1647 let executor = deterministic::Runner::default();
1648 test_blob_read_write(executor);
1649 }
1650
1651 #[test]
1652 fn test_deterministic_blob_resize() {
1653 let executor = deterministic::Runner::default();
1654 test_blob_resize(executor);
1655 }
1656
1657 #[test]
1658 fn test_deterministic_many_partition_read_write() {
1659 let executor = deterministic::Runner::default();
1660 test_many_partition_read_write(executor);
1661 }
1662
1663 #[test]
1664 fn test_deterministic_blob_read_past_length() {
1665 let executor = deterministic::Runner::default();
1666 test_blob_read_past_length(executor);
1667 }
1668
1669 #[test]
1670 fn test_deterministic_blob_clone_and_concurrent_read() {
1671 let executor = deterministic::Runner::default();
1673 test_blob_clone_and_concurrent_read(executor);
1674 }
1675
1676 #[test]
1677 fn test_deterministic_shutdown() {
1678 let executor = deterministic::Runner::default();
1679 test_shutdown(executor);
1680 }
1681
1682 #[test]
1683 fn test_deterministic_shutdown_multiple_signals() {
1684 let executor = deterministic::Runner::default();
1685 test_shutdown_multiple_signals(executor);
1686 }
1687
1688 #[test]
1689 fn test_deterministic_shutdown_timeout() {
1690 let executor = deterministic::Runner::default();
1691 test_shutdown_timeout(executor);
1692 }
1693
1694 #[test]
1695 fn test_deterministic_shutdown_multiple_stop_calls() {
1696 let executor = deterministic::Runner::default();
1697 test_shutdown_multiple_stop_calls(executor);
1698 }
1699
1700 #[test]
1701 fn test_deterministic_spawn_ref() {
1702 let executor = deterministic::Runner::default();
1703 test_spawn_ref(executor);
1704 }
1705
1706 #[test]
1707 #[should_panic]
1708 fn test_deterministic_spawn_ref_duplicate() {
1709 let executor = deterministic::Runner::default();
1710 test_spawn_ref_duplicate(executor);
1711 }
1712
1713 #[test]
1714 #[should_panic]
1715 fn test_deterministic_spawn_duplicate() {
1716 let executor = deterministic::Runner::default();
1717 test_spawn_duplicate(executor);
1718 }
1719
1720 #[test]
1721 fn test_deterministic_spawn_child() {
1722 let runner = deterministic::Runner::default();
1723 test_spawn_child(runner);
1724 }
1725
1726 #[test]
1727 fn test_deterministic_spawn_child_abort_on_parent_abort() {
1728 let runner = deterministic::Runner::default();
1729 test_spawn_child_abort_on_parent_abort(runner);
1730 }
1731
1732 #[test]
1733 fn test_deterministic_spawn_child_abort_on_parent_completion() {
1734 let runner = deterministic::Runner::default();
1735 test_spawn_child_abort_on_parent_completion(runner);
1736 }
1737
1738 #[test]
1739 fn test_deterministic_spawn_child_cascading_abort() {
1740 let runner = deterministic::Runner::default();
1741 test_spawn_child_cascading_abort(runner);
1742 }
1743
1744 #[test]
1745 fn test_deterministic_child_survives_sibling_completion() {
1746 for use_spawn_ref in [false, true] {
1747 let runner = deterministic::Runner::default();
1748 test_child_survives_sibling_completion(runner, use_spawn_ref);
1749 }
1750 }
1751
1752 #[test]
1753 fn test_deterministic_clone_context_no_child_inheritance() {
1754 let runner = deterministic::Runner::default();
1755 test_clone_context_no_child_inheritance(runner);
1756 }
1757
1758 #[test]
1759 fn test_deterministic_spawn_blocking() {
1760 for dedicated in [false, true] {
1761 let executor = deterministic::Runner::default();
1762 test_spawn_blocking(executor, dedicated);
1763 }
1764 }
1765
1766 #[test]
1767 #[should_panic(expected = "blocking task panicked")]
1768 fn test_deterministic_spawn_blocking_panic() {
1769 for dedicated in [false, true] {
1770 let executor = deterministic::Runner::default();
1771 executor.start(|context| async move {
1772 let handle = context.spawn_blocking(dedicated, |_| {
1773 panic!("blocking task panicked");
1774 });
1775 handle.await.unwrap();
1776 });
1777 }
1778 }
1779
1780 #[test]
1781 fn test_deterministic_spawn_blocking_abort() {
1782 for dedicated in [false, true] {
1783 let executor = deterministic::Runner::default();
1784 test_spawn_blocking_abort(executor, dedicated);
1785 }
1786 }
1787
1788 #[test]
1789 fn test_deterministic_spawn_blocking_ref() {
1790 for dedicated in [false, true] {
1791 let executor = deterministic::Runner::default();
1792 test_spawn_blocking_ref(executor, dedicated);
1793 }
1794 }
1795
1796 #[test]
1797 #[should_panic]
1798 fn test_deterministic_spawn_blocking_ref_duplicate() {
1799 for dedicated in [false, true] {
1800 let executor = deterministic::Runner::default();
1801 test_spawn_blocking_ref_duplicate(executor, dedicated);
1802 }
1803 }
1804
1805 #[test]
1806 fn test_deterministic_metrics() {
1807 let executor = deterministic::Runner::default();
1808 test_metrics(executor);
1809 }
1810
1811 #[test]
1812 #[should_panic]
1813 fn test_deterministic_metrics_label() {
1814 let executor = deterministic::Runner::default();
1815 test_metrics_label(executor);
1816 }
1817
1818 #[test]
1819 fn test_tokio_error_future() {
1820 let runner = tokio::Runner::default();
1821 test_error_future(runner);
1822 }
1823
1824 #[test]
1825 fn test_tokio_clock_sleep() {
1826 let executor = tokio::Runner::default();
1827 test_clock_sleep(executor);
1828 }
1829
1830 #[test]
1831 fn test_tokio_clock_sleep_until() {
1832 let executor = tokio::Runner::default();
1833 test_clock_sleep_until(executor);
1834 }
1835
1836 #[test]
1837 fn test_tokio_root_finishes() {
1838 let executor = tokio::Runner::default();
1839 test_root_finishes(executor);
1840 }
1841
1842 #[test]
1843 fn test_tokio_spawn_abort() {
1844 let executor = tokio::Runner::default();
1845 test_spawn_abort(executor);
1846 }
1847
1848 #[test]
1849 fn test_tokio_panic_aborts_root() {
1850 let executor = tokio::Runner::default();
1851 test_panic_aborts_root(executor);
1852 }
1853
1854 #[test]
1855 fn test_tokio_panic_aborts_spawn() {
1856 let executor = tokio::Runner::default();
1857 test_panic_aborts_spawn(executor);
1858 }
1859
1860 #[test]
1861 fn test_tokio_select() {
1862 let executor = tokio::Runner::default();
1863 test_select(executor);
1864 }
1865
1866 #[test]
1867 fn test_tokio_select_loop() {
1868 let executor = tokio::Runner::default();
1869 test_select_loop(executor);
1870 }
1871
1872 #[test]
1873 fn test_tokio_storage_operations() {
1874 let executor = tokio::Runner::default();
1875 test_storage_operations(executor);
1876 }
1877
1878 #[test]
1879 fn test_tokio_blob_read_write() {
1880 let executor = tokio::Runner::default();
1881 test_blob_read_write(executor);
1882 }
1883
1884 #[test]
1885 fn test_tokio_blob_resize() {
1886 let executor = tokio::Runner::default();
1887 test_blob_resize(executor);
1888 }
1889
1890 #[test]
1891 fn test_tokio_many_partition_read_write() {
1892 let executor = tokio::Runner::default();
1893 test_many_partition_read_write(executor);
1894 }
1895
1896 #[test]
1897 fn test_tokio_blob_read_past_length() {
1898 let executor = tokio::Runner::default();
1899 test_blob_read_past_length(executor);
1900 }
1901
1902 #[test]
1903 fn test_tokio_blob_clone_and_concurrent_read() {
1904 let executor = tokio::Runner::default();
1906 test_blob_clone_and_concurrent_read(executor);
1907 }
1908
1909 #[test]
1910 fn test_tokio_shutdown() {
1911 let executor = tokio::Runner::default();
1912 test_shutdown(executor);
1913 }
1914
1915 #[test]
1916 fn test_tokio_shutdown_multiple_signals() {
1917 let executor = tokio::Runner::default();
1918 test_shutdown_multiple_signals(executor);
1919 }
1920
1921 #[test]
1922 fn test_tokio_shutdown_timeout() {
1923 let executor = tokio::Runner::default();
1924 test_shutdown_timeout(executor);
1925 }
1926
1927 #[test]
1928 fn test_tokio_shutdown_multiple_stop_calls() {
1929 let executor = tokio::Runner::default();
1930 test_shutdown_multiple_stop_calls(executor);
1931 }
1932
1933 #[test]
1934 fn test_tokio_spawn_ref() {
1935 let executor = tokio::Runner::default();
1936 test_spawn_ref(executor);
1937 }
1938
1939 #[test]
1940 #[should_panic]
1941 fn test_tokio_spawn_ref_duplicate() {
1942 let executor = tokio::Runner::default();
1943 test_spawn_ref_duplicate(executor);
1944 }
1945
1946 #[test]
1947 #[should_panic]
1948 fn test_tokio_spawn_duplicate() {
1949 let executor = tokio::Runner::default();
1950 test_spawn_duplicate(executor);
1951 }
1952
1953 #[test]
1954 fn test_tokio_spawn_child() {
1955 let runner = tokio::Runner::default();
1956 test_spawn_child(runner);
1957 }
1958
1959 #[test]
1960 fn test_tokio_spawn_child_abort_on_parent_abort() {
1961 let runner = tokio::Runner::default();
1962 test_spawn_child_abort_on_parent_abort(runner);
1963 }
1964
1965 #[test]
1966 fn test_tokio_spawn_child_abort_on_parent_completion() {
1967 let runner = tokio::Runner::default();
1968 test_spawn_child_abort_on_parent_completion(runner);
1969 }
1970
1971 #[test]
1972 fn test_tokio_spawn_child_cascading_abort() {
1973 let runner = tokio::Runner::default();
1974 test_spawn_child_cascading_abort(runner);
1975 }
1976
1977 #[test]
1978 fn test_tokio_child_survives_sibling_completion() {
1979 for use_spawn_ref in [false, true] {
1980 let runner = tokio::Runner::default();
1981 test_child_survives_sibling_completion(runner, use_spawn_ref);
1982 }
1983 }
1984
1985 #[test]
1986 fn test_tokio_clone_context_no_child_inheritance() {
1987 let runner = tokio::Runner::default();
1988 test_clone_context_no_child_inheritance(runner);
1989 }
1990
1991 #[test]
1992 fn test_tokio_spawn_blocking() {
1993 for dedicated in [false, true] {
1994 let executor = tokio::Runner::default();
1995 test_spawn_blocking(executor, dedicated);
1996 }
1997 }
1998
1999 #[test]
2000 fn test_tokio_spawn_blocking_panic() {
2001 for dedicated in [false, true] {
2002 let executor = tokio::Runner::default();
2003 executor.start(|context| async move {
2004 let handle = context.spawn_blocking(dedicated, |_| {
2005 panic!("blocking task panicked");
2006 });
2007 let result = handle.await;
2008 assert!(matches!(result, Err(Error::Exited)));
2009 });
2010 }
2011 }
2012
2013 #[test]
2014 fn test_tokio_spawn_blocking_abort() {
2015 for dedicated in [false, true] {
2016 let executor = tokio::Runner::default();
2017 test_spawn_blocking_abort(executor, dedicated);
2018 }
2019 }
2020
2021 #[test]
2022 fn test_tokio_spawn_blocking_ref() {
2023 for dedicated in [false, true] {
2024 let executor = tokio::Runner::default();
2025 test_spawn_blocking_ref(executor, dedicated);
2026 }
2027 }
2028
2029 #[test]
2030 #[should_panic]
2031 fn test_tokio_spawn_blocking_ref_duplicate() {
2032 for dedicated in [false, true] {
2033 let executor = tokio::Runner::default();
2034 test_spawn_blocking_ref_duplicate(executor, dedicated);
2035 }
2036 }
2037
2038 #[test]
2039 fn test_tokio_metrics() {
2040 let executor = tokio::Runner::default();
2041 test_metrics(executor);
2042 }
2043
2044 #[test]
2045 #[should_panic]
2046 fn test_tokio_metrics_label() {
2047 let executor = tokio::Runner::default();
2048 test_metrics_label(executor);
2049 }
2050
2051 #[test]
2052 fn test_tokio_process_rss_metric() {
2053 let executor = tokio::Runner::default();
2054 executor.start(|context| async move {
2055 loop {
2056 let metrics = context.encode();
2058 if !metrics.contains("runtime_process_rss") {
2059 context.sleep(Duration::from_millis(100)).await;
2060 continue;
2061 }
2062
2063 for line in metrics.lines() {
2065 if line.starts_with("runtime_process_rss")
2066 && !line.starts_with("runtime_process_rss{")
2067 {
2068 let parts: Vec<&str> = line.split_whitespace().collect();
2069 if parts.len() >= 2 {
2070 let rss_value: i64 =
2071 parts[1].parse().expect("Failed to parse RSS value");
2072 if rss_value > 0 {
2073 return;
2074 }
2075 }
2076 }
2077 }
2078 }
2079 });
2080 }
2081
2082 #[test]
2083 fn test_tokio_telemetry() {
2084 let executor = tokio::Runner::default();
2085 executor.start(|context| async move {
2086 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2088
2089 tokio::telemetry::init(
2091 context.with_label("metrics"),
2092 tokio::telemetry::Logging {
2093 level: Level::INFO,
2094 json: false,
2095 },
2096 Some(address),
2097 None,
2098 );
2099
2100 let counter: Counter<u64> = Counter::default();
2102 context.register("test_counter", "Test counter", counter.clone());
2103 counter.inc();
2104
2105 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2107 let mut line = Vec::new();
2108 loop {
2109 let byte = stream.recv(vec![0; 1]).await?;
2110 if byte[0] == b'\n' {
2111 if line.last() == Some(&b'\r') {
2112 line.pop(); }
2114 break;
2115 }
2116 line.push(byte[0]);
2117 }
2118 String::from_utf8(line).map_err(|_| Error::ReadFailed)
2119 }
2120
2121 async fn read_headers<St: Stream>(
2122 stream: &mut St,
2123 ) -> Result<HashMap<String, String>, Error> {
2124 let mut headers = HashMap::new();
2125 loop {
2126 let line = read_line(stream).await?;
2127 if line.is_empty() {
2128 break;
2129 }
2130 let parts: Vec<&str> = line.splitn(2, ": ").collect();
2131 if parts.len() == 2 {
2132 headers.insert(parts[0].to_string(), parts[1].to_string());
2133 }
2134 }
2135 Ok(headers)
2136 }
2137
2138 async fn read_body<St: Stream>(
2139 stream: &mut St,
2140 content_length: usize,
2141 ) -> Result<String, Error> {
2142 let read = stream.recv(vec![0; content_length]).await?;
2143 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2144 }
2145
2146 let client_handle = context
2148 .with_label("client")
2149 .spawn(move |context| async move {
2150 let (mut sink, mut stream) = loop {
2151 match context.dial(address).await {
2152 Ok((sink, stream)) => break (sink, stream),
2153 Err(e) => {
2154 error!(err =?e, "failed to connect");
2156 context.sleep(Duration::from_millis(10)).await;
2157 }
2158 }
2159 };
2160
2161 let request = format!(
2163 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2164 );
2165 sink.send(Bytes::from(request).to_vec()).await.unwrap();
2166
2167 let status_line = read_line(&mut stream).await.unwrap();
2169 assert_eq!(status_line, "HTTP/1.1 200 OK");
2170
2171 let headers = read_headers(&mut stream).await.unwrap();
2173 println!("Headers: {headers:?}");
2174 let content_length = headers
2175 .get("content-length")
2176 .unwrap()
2177 .parse::<usize>()
2178 .unwrap();
2179
2180 let body = read_body(&mut stream, content_length).await.unwrap();
2182 assert!(body.contains("test_counter_total 1"));
2183 });
2184
2185 client_handle.await.unwrap();
2187 });
2188 }
2189}