1#![doc(
21 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
22 html_favicon_url = "https://commonware.xyz/favicon.ico"
23)]
24
25use commonware_macros::select;
26use commonware_utils::StableBuf;
27use prometheus_client::registry::Metric;
28use std::{
29 future::Future,
30 io::Error as IoError,
31 net::SocketAddr,
32 time::{Duration, SystemTime},
33};
34use thiserror::Error;
35
36#[macro_use]
37mod macros;
38
39pub mod deterministic;
40pub mod mocks;
41cfg_if::cfg_if! {
42 if #[cfg(not(target_arch = "wasm32"))] {
43 pub mod tokio;
44 pub mod benchmarks;
45 }
46}
47mod network;
48mod process;
49mod storage;
50pub mod telemetry;
51mod utils;
52pub use utils::*;
53#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
54mod iouring;
55
56const METRICS_PREFIX: &str = "runtime";
58
59#[derive(Error, Debug)]
61pub enum Error {
62 #[error("exited")]
63 Exited,
64 #[error("closed")]
65 Closed,
66 #[error("timeout")]
67 Timeout,
68 #[error("bind failed")]
69 BindFailed,
70 #[error("connection failed")]
71 ConnectionFailed,
72 #[error("write failed")]
73 WriteFailed,
74 #[error("read failed")]
75 ReadFailed,
76 #[error("send failed")]
77 SendFailed,
78 #[error("recv failed")]
79 RecvFailed,
80 #[error("dns resolution failed: {0}")]
81 ResolveFailed(String),
82 #[error("partition name invalid, must only contain alphanumeric, dash ('-'), or underscore ('_') characters: {0}")]
83 PartitionNameInvalid(String),
84 #[error("partition creation failed: {0}")]
85 PartitionCreationFailed(String),
86 #[error("partition missing: {0}")]
87 PartitionMissing(String),
88 #[error("partition corrupt: {0}")]
89 PartitionCorrupt(String),
90 #[error("blob open failed: {0}/{1} error: {2}")]
91 BlobOpenFailed(String, String, IoError),
92 #[error("blob missing: {0}/{1}")]
93 BlobMissing(String, String),
94 #[error("blob resize failed: {0}/{1} error: {2}")]
95 BlobResizeFailed(String, String, IoError),
96 #[error("blob sync failed: {0}/{1} error: {2}")]
97 BlobSyncFailed(String, String, IoError),
98 #[error("blob insufficient length")]
99 BlobInsufficientLength,
100 #[error("offset overflow")]
101 OffsetOverflow,
102 #[error("io error: {0}")]
103 Io(#[from] IoError),
104}
105
106pub trait Runner {
109 type Context;
115
116 fn start<F, Fut>(self, f: F) -> Fut::Output
122 where
123 F: FnOnce(Self::Context) -> Fut,
124 Fut: Future;
125}
126
127pub trait Spawner: Clone + Send + Sync + 'static {
129 fn shared(self, blocking: bool) -> Self;
138
139 fn dedicated(self) -> Self;
146
147 fn instrumented(self) -> Self;
149
150 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
184 where
185 F: FnOnce(Self) -> Fut + Send + 'static,
186 Fut: Future<Output = T> + Send + 'static,
187 T: Send + 'static;
188
189 fn stop(
209 self,
210 value: i32,
211 timeout: Option<Duration>,
212 ) -> impl Future<Output = Result<(), Error>> + Send;
213
214 fn stopped(&self) -> signal::Signal;
221}
222
223pub trait Metrics: Clone + Send + Sync + 'static {
225 fn label(&self) -> String;
227
228 fn with_label(&self, label: &str) -> Self;
236
237 fn scoped_label(&self, label: &str) -> String {
241 let label = if self.label().is_empty() {
242 label.to_string()
243 } else {
244 format!("{}_{}", self.label(), label)
245 };
246 assert!(
247 !label.starts_with(METRICS_PREFIX),
248 "using runtime label is not allowed"
249 );
250 label
251 }
252
253 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
257
258 fn encode(&self) -> String;
260}
261
262pub use governor::Quota;
264
265pub type RateLimiter<C> = governor::RateLimiter<
270 governor::state::NotKeyed,
271 governor::state::InMemoryState,
272 C,
273 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
274>;
275
276pub type KeyedRateLimiter<K, C> = governor::RateLimiter<
283 K,
284 governor::state::keyed::HashMapStateStore<K>,
285 C,
286 governor::middleware::NoOpMiddleware<<C as governor::clock::Clock>::Instant>,
287>;
288
289pub trait Clock:
295 governor::clock::Clock<Instant = SystemTime>
296 + governor::clock::ReasonablyRealtime
297 + Clone
298 + Send
299 + Sync
300 + 'static
301{
302 fn current(&self) -> SystemTime;
304
305 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
307
308 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
310
311 fn timeout<F, T>(
332 &self,
333 duration: Duration,
334 future: F,
335 ) -> impl Future<Output = Result<T, Error>> + Send + '_
336 where
337 F: Future<Output = T> + Send + 'static,
338 T: Send + 'static,
339 {
340 async move {
341 select! {
342 result = future => {
343 Ok(result)
344 },
345 _ = self.sleep(duration) => {
346 Err(Error::Timeout)
347 },
348 }
349 }
350 }
351}
352
353cfg_if::cfg_if! {
354 if #[cfg(feature = "external")] {
355 pub trait Pacer: Clock + Clone + Send + Sync + 'static {
357 fn pace<'a, F, T>(
377 &'a self,
378 latency: Duration,
379 future: F,
380 ) -> impl Future<Output = T> + Send + 'a
381 where
382 F: Future<Output = T> + Send + 'a,
383 T: Send + 'a;
384 }
385
386 pub trait FutureExt: Future + Send + Sized {
391 fn pace<'a, E>(
393 self,
394 pacer: &'a E,
395 latency: Duration,
396 ) -> impl Future<Output = Self::Output> + Send + 'a
397 where
398 E: Pacer + 'a,
399 Self: Send + 'a,
400 Self::Output: Send + 'a,
401 {
402 pacer.pace(latency, self)
403 }
404 }
405
406 impl<F> FutureExt for F where F: Future + Send {}
407 }
408}
409
410pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
412
413pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
415
416pub type ListenerOf<N> = <N as crate::Network>::Listener;
418
419pub trait Network: Clone + Send + Sync + 'static {
422 type Listener: Listener;
426
427 fn bind(
429 &self,
430 socket: SocketAddr,
431 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
432
433 fn dial(
435 &self,
436 socket: SocketAddr,
437 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
438}
439
440pub trait Resolver: Clone + Send + Sync + 'static {
442 fn resolve(
446 &self,
447 host: &str,
448 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, Error>> + Send;
449}
450
451pub trait Listener: Sync + Send + 'static {
454 type Sink: Sink;
457 type Stream: Stream;
460
461 fn accept(
463 &mut self,
464 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
465
466 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
468}
469
470pub trait Sink: Sync + Send + 'static {
473 fn send(
479 &mut self,
480 msg: impl Into<StableBuf> + Send,
481 ) -> impl Future<Output = Result<(), Error>> + Send;
482}
483
484pub trait Stream: Sync + Send + 'static {
487 fn recv(
494 &mut self,
495 buf: impl Into<StableBuf> + Send,
496 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
497}
498
499pub trait Storage: Clone + Send + Sync + 'static {
512 type Blob: Blob;
514
515 fn open(
523 &self,
524 partition: &str,
525 name: &[u8],
526 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
527
528 fn remove(
534 &self,
535 partition: &str,
536 name: Option<&[u8]>,
537 ) -> impl Future<Output = Result<(), Error>> + Send;
538
539 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
541}
542
543#[allow(clippy::len_without_is_empty)]
558pub trait Blob: Clone + Send + Sync + 'static {
559 fn read_at(
564 &self,
565 buf: impl Into<StableBuf> + Send,
566 offset: u64,
567 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
568
569 fn write_at(
571 &self,
572 buf: impl Into<StableBuf> + Send,
573 offset: u64,
574 ) -> impl Future<Output = Result<(), Error>> + Send;
575
576 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
581
582 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use crate::telemetry::traces::collector::TraceStorage;
590 use bytes::Bytes;
591 use commonware_macros::{select, test_collect_traces};
592 use futures::{
593 channel::{mpsc, oneshot},
594 future::{pending, ready},
595 join, pin_mut, FutureExt, SinkExt, StreamExt,
596 };
597 use prometheus_client::metrics::counter::Counter;
598 use std::{
599 collections::HashMap,
600 net::{IpAddr, Ipv4Addr, Ipv6Addr},
601 pin::Pin,
602 str::FromStr,
603 sync::{
604 atomic::{AtomicU32, Ordering},
605 Arc, Mutex,
606 },
607 task::{Context as TContext, Poll, Waker},
608 };
609 use tracing::{error, Level};
610 use utils::reschedule;
611
612 fn test_error_future<R: Runner>(runner: R) {
613 async fn error_future() -> Result<&'static str, &'static str> {
614 Err("An error occurred")
615 }
616 let result = runner.start(|_| error_future());
617 assert_eq!(result, Err("An error occurred"));
618 }
619
620 fn test_clock_sleep<R: Runner>(runner: R)
621 where
622 R::Context: Spawner + Clock,
623 {
624 runner.start(|context| async move {
625 let start = context.current();
627 let sleep_duration = Duration::from_millis(10);
628 context.sleep(sleep_duration).await;
629
630 let end = context.current();
632 assert!(end.duration_since(start).unwrap() >= sleep_duration);
633 });
634 }
635
636 fn test_clock_sleep_until<R: Runner>(runner: R)
637 where
638 R::Context: Spawner + Clock + Metrics,
639 {
640 runner.start(|context| async move {
641 let now = context.current();
643 context.sleep_until(now + Duration::from_millis(100)).await;
644
645 let elapsed = now.elapsed().unwrap();
647 assert!(elapsed >= Duration::from_millis(100));
648 });
649 }
650
651 fn test_clock_timeout<R: Runner>(runner: R)
652 where
653 R::Context: Spawner + Clock,
654 {
655 runner.start(|context| async move {
656 let result = context
658 .timeout(Duration::from_millis(100), async { "success" })
659 .await;
660 assert_eq!(result.unwrap(), "success");
661
662 let result = context
664 .timeout(Duration::from_millis(50), pending::<()>())
665 .await;
666 assert!(matches!(result, Err(Error::Timeout)));
667
668 let result = context
670 .timeout(
671 Duration::from_millis(100),
672 context.sleep(Duration::from_millis(50)),
673 )
674 .await;
675 assert!(result.is_ok());
676 });
677 }
678
679 fn test_root_finishes<R: Runner>(runner: R)
680 where
681 R::Context: Spawner,
682 {
683 runner.start(|context| async move {
684 context.spawn(|_| async move {
685 loop {
686 reschedule().await;
687 }
688 });
689 });
690 }
691
692 fn test_spawn_after_abort<R>(runner: R)
693 where
694 R: Runner,
695 R::Context: Spawner + Clone,
696 {
697 runner.start(|context| async move {
698 let child = context.clone();
700
701 let parent_handle = context.spawn(move |_| async move {
703 pending::<()>().await;
704 });
705 parent_handle.abort();
706
707 let child_handle = child.spawn(move |_| async move {
709 pending::<()>().await;
710 });
711 assert!(matches!(child_handle.await, Err(Error::Closed)));
712 });
713 }
714
715 fn test_spawn_abort<R: Runner>(runner: R, dedicated: bool, blocking: bool)
716 where
717 R::Context: Spawner,
718 {
719 runner.start(|context| async move {
720 let context = if dedicated {
721 assert!(!blocking);
722 context.dedicated()
723 } else {
724 context.shared(blocking)
725 };
726
727 let handle = context.spawn(|_| async move {
728 loop {
729 reschedule().await;
730 }
731 });
732 handle.abort();
733 assert!(matches!(handle.await, Err(Error::Closed)));
734 });
735 }
736
737 fn test_panic_aborts_root<R: Runner>(runner: R) {
738 let result: Result<(), Error> = runner.start(|_| async move {
739 panic!("blah");
740 });
741 result.unwrap_err();
742 }
743
744 fn test_panic_aborts_spawn<R: Runner>(runner: R)
745 where
746 R::Context: Spawner + Clock,
747 {
748 runner.start(|context| async move {
749 context.clone().spawn(|_| async move {
750 panic!("blah");
751 });
752
753 loop {
755 context.sleep(Duration::from_millis(100)).await;
756 }
757 });
758 }
759
760 fn test_panic_aborts_spawn_caught<R: Runner>(runner: R)
761 where
762 R::Context: Spawner + Clock,
763 {
764 let result: Result<(), Error> = runner.start(|context| async move {
765 let result = context.clone().spawn(|_| async move {
766 panic!("blah");
767 });
768 result.await
769 });
770 assert!(matches!(result, Err(Error::Exited)));
771 }
772
773 fn test_multiple_panics<R: Runner>(runner: R)
774 where
775 R::Context: Spawner + Clock,
776 {
777 runner.start(|context| async move {
778 context.clone().spawn(|_| async move {
779 panic!("boom 1");
780 });
781 context.clone().spawn(|_| async move {
782 panic!("boom 2");
783 });
784 context.clone().spawn(|_| async move {
785 panic!("boom 3");
786 });
787
788 loop {
790 context.sleep(Duration::from_millis(100)).await;
791 }
792 });
793 }
794
795 fn test_multiple_panics_caught<R: Runner>(runner: R)
796 where
797 R::Context: Spawner + Clock,
798 {
799 let (res1, res2, res3) = runner.start(|context| async move {
800 let handle1 = context.clone().spawn(|_| async move {
801 panic!("boom 1");
802 });
803 let handle2 = context.clone().spawn(|_| async move {
804 panic!("boom 2");
805 });
806 let handle3 = context.clone().spawn(|_| async move {
807 panic!("boom 3");
808 });
809
810 join!(handle1, handle2, handle3)
811 });
812 assert!(matches!(res1, Err(Error::Exited)));
813 assert!(matches!(res2, Err(Error::Exited)));
814 assert!(matches!(res3, Err(Error::Exited)));
815 }
816
817 fn test_select<R: Runner>(runner: R) {
818 runner.start(|_| async move {
819 let output = Mutex::new(0);
821 select! {
822 v1 = ready(1) => {
823 *output.lock().unwrap() = v1;
824 },
825 v2 = ready(2) => {
826 *output.lock().unwrap() = v2;
827 },
828 };
829 assert_eq!(*output.lock().unwrap(), 1);
830
831 select! {
833 v1 = std::future::pending::<i32>() => {
834 *output.lock().unwrap() = v1;
835 },
836 v2 = ready(2) => {
837 *output.lock().unwrap() = v2;
838 },
839 };
840 assert_eq!(*output.lock().unwrap(), 2);
841 });
842 }
843
844 fn test_select_loop<R: Runner>(runner: R)
846 where
847 R::Context: Clock,
848 {
849 runner.start(|context| async move {
850 let (mut sender, mut receiver) = mpsc::unbounded();
852 for _ in 0..2 {
853 select! {
854 v = receiver.next() => {
855 panic!("unexpected value: {v:?}");
856 },
857 _ = context.sleep(Duration::from_millis(100)) => {
858 continue;
859 },
860 };
861 }
862
863 sender.send(0).await.unwrap();
865 sender.send(1).await.unwrap();
866
867 select! {
869 _ = async {} => {
870 },
872 v = receiver.next() => {
873 panic!("unexpected value: {v:?}");
874 },
875 };
876
877 for i in 0..2 {
879 select! {
880 _ = context.sleep(Duration::from_millis(100)) => {
881 panic!("timeout");
882 },
883 v = receiver.next() => {
884 assert_eq!(v.unwrap(), i);
885 },
886 };
887 }
888 });
889 }
890
891 fn test_storage_operations<R: Runner>(runner: R)
892 where
893 R::Context: Storage,
894 {
895 runner.start(|context| async move {
896 let partition = "test_partition";
897 let name = b"test_blob";
898
899 let (blob, _) = context
901 .open(partition, name)
902 .await
903 .expect("Failed to open blob");
904
905 let data = b"Hello, Storage!";
907 blob.write_at(Vec::from(data), 0)
908 .await
909 .expect("Failed to write to blob");
910
911 blob.sync().await.expect("Failed to sync blob");
913
914 let read = blob
916 .read_at(vec![0; data.len()], 0)
917 .await
918 .expect("Failed to read from blob");
919 assert_eq!(read.as_ref(), data);
920
921 blob.sync().await.expect("Failed to sync blob");
923
924 let blobs = context
926 .scan(partition)
927 .await
928 .expect("Failed to scan partition");
929 assert!(blobs.contains(&name.to_vec()));
930
931 let (blob, len) = context
933 .open(partition, name)
934 .await
935 .expect("Failed to reopen blob");
936 assert_eq!(len, data.len() as u64);
937
938 let read = blob
940 .read_at(vec![0u8; 7], 7)
941 .await
942 .expect("Failed to read data");
943 assert_eq!(read.as_ref(), b"Storage");
944
945 blob.sync().await.expect("Failed to sync blob");
947
948 context
950 .remove(partition, Some(name))
951 .await
952 .expect("Failed to remove blob");
953
954 let blobs = context
956 .scan(partition)
957 .await
958 .expect("Failed to scan partition");
959 assert!(!blobs.contains(&name.to_vec()));
960
961 context
963 .remove(partition, None)
964 .await
965 .expect("Failed to remove partition");
966
967 let result = context.scan(partition).await;
969 assert!(matches!(result, Err(Error::PartitionMissing(_))));
970 });
971 }
972
973 fn test_blob_read_write<R: Runner>(runner: R)
974 where
975 R::Context: Storage,
976 {
977 runner.start(|context| async move {
978 let partition = "test_partition";
979 let name = b"test_blob_rw";
980
981 let (blob, _) = context
983 .open(partition, name)
984 .await
985 .expect("Failed to open blob");
986
987 let data1 = b"Hello";
989 let data2 = b"World";
990 blob.write_at(Vec::from(data1), 0)
991 .await
992 .expect("Failed to write data1");
993 blob.write_at(Vec::from(data2), 5)
994 .await
995 .expect("Failed to write data2");
996
997 let read = blob
999 .read_at(vec![0u8; 10], 0)
1000 .await
1001 .expect("Failed to read data");
1002 assert_eq!(&read.as_ref()[..5], data1);
1003 assert_eq!(&read.as_ref()[5..], data2);
1004
1005 let result = blob.read_at(vec![0u8; 10], 10).await;
1007 assert!(result.is_err());
1008
1009 let data3 = b"Store";
1011 blob.write_at(Vec::from(data3), 5)
1012 .await
1013 .expect("Failed to write data3");
1014
1015 let read = blob
1017 .read_at(vec![0u8; 10], 0)
1018 .await
1019 .expect("Failed to read data");
1020 assert_eq!(&read.as_ref()[..5], data1);
1021 assert_eq!(&read.as_ref()[5..], data3);
1022
1023 let result = blob.read_at(vec![0u8; 10], 10).await;
1025 assert!(result.is_err());
1026 });
1027 }
1028
1029 fn test_blob_resize<R: Runner>(runner: R)
1030 where
1031 R::Context: Storage,
1032 {
1033 runner.start(|context| async move {
1034 let partition = "test_partition_resize";
1035 let name = b"test_blob_resize";
1036
1037 let (blob, _) = context
1039 .open(partition, name)
1040 .await
1041 .expect("Failed to open blob");
1042
1043 let data = b"some data";
1044 blob.write_at(data.to_vec(), 0)
1045 .await
1046 .expect("Failed to write");
1047 blob.sync().await.expect("Failed to sync after write");
1048
1049 let (blob, len) = context.open(partition, name).await.unwrap();
1051 assert_eq!(len, data.len() as u64);
1052
1053 let new_len = (data.len() as u64) * 2;
1055 blob.resize(new_len)
1056 .await
1057 .expect("Failed to resize to extend");
1058 blob.sync().await.expect("Failed to sync after resize");
1059
1060 let (blob, len) = context.open(partition, name).await.unwrap();
1062 assert_eq!(len, new_len);
1063
1064 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1066 assert_eq!(read_buf.as_ref(), data);
1067
1068 let extended_part = blob
1070 .read_at(vec![0; data.len()], data.len() as u64)
1071 .await
1072 .unwrap();
1073 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
1074
1075 blob.resize(data.len() as u64).await.unwrap();
1077 blob.sync().await.unwrap();
1078
1079 let (blob, size) = context.open(partition, name).await.unwrap();
1081 assert_eq!(size, data.len() as u64);
1082
1083 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1085 assert_eq!(read_buf.as_ref(), data);
1086 blob.sync().await.unwrap();
1087 });
1088 }
1089
1090 fn test_many_partition_read_write<R: Runner>(runner: R)
1091 where
1092 R::Context: Storage,
1093 {
1094 runner.start(|context| async move {
1095 let partitions = ["partition1", "partition2", "partition3"];
1096 let name = b"test_blob_rw";
1097 let data1 = b"Hello";
1098 let data2 = b"World";
1099
1100 for (additional, partition) in partitions.iter().enumerate() {
1101 let (blob, _) = context
1103 .open(partition, name)
1104 .await
1105 .expect("Failed to open blob");
1106
1107 blob.write_at(Vec::from(data1), 0)
1109 .await
1110 .expect("Failed to write data1");
1111 blob.write_at(Vec::from(data2), 5 + additional as u64)
1112 .await
1113 .expect("Failed to write data2");
1114
1115 blob.sync().await.expect("Failed to sync blob");
1117 }
1118
1119 for (additional, partition) in partitions.iter().enumerate() {
1120 let (blob, len) = context
1122 .open(partition, name)
1123 .await
1124 .expect("Failed to open blob");
1125 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
1126
1127 let read = blob
1129 .read_at(vec![0u8; 10 + additional], 0)
1130 .await
1131 .expect("Failed to read data");
1132 assert_eq!(&read.as_ref()[..5], b"Hello");
1133 assert_eq!(&read.as_ref()[5 + additional..], b"World");
1134 }
1135 });
1136 }
1137
1138 fn test_blob_read_past_length<R: Runner>(runner: R)
1139 where
1140 R::Context: Storage,
1141 {
1142 runner.start(|context| async move {
1143 let partition = "test_partition";
1144 let name = b"test_blob_rw";
1145
1146 let (blob, _) = context
1148 .open(partition, name)
1149 .await
1150 .expect("Failed to open blob");
1151
1152 let result = blob.read_at(vec![0u8; 10], 0).await;
1154 assert!(result.is_err());
1155
1156 let data = b"Hello, Storage!".to_vec();
1158 blob.write_at(data, 0)
1159 .await
1160 .expect("Failed to write to blob");
1161
1162 let result = blob.read_at(vec![0u8; 20], 0).await;
1164 assert!(result.is_err());
1165 })
1166 }
1167
1168 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
1169 where
1170 R::Context: Spawner + Storage + Metrics,
1171 {
1172 runner.start(|context| async move {
1173 let partition = "test_partition";
1174 let name = b"test_blob_rw";
1175
1176 let (blob, _) = context
1178 .open(partition, name)
1179 .await
1180 .expect("Failed to open blob");
1181
1182 let data = b"Hello, Storage!";
1184 blob.write_at(Vec::from(data), 0)
1185 .await
1186 .expect("Failed to write to blob");
1187
1188 blob.sync().await.expect("Failed to sync blob");
1190
1191 let check1 = context.with_label("check1").spawn({
1193 let blob = blob.clone();
1194 move |_| async move {
1195 let read = blob
1196 .read_at(vec![0u8; data.len()], 0)
1197 .await
1198 .expect("Failed to read from blob");
1199 assert_eq!(read.as_ref(), data);
1200 }
1201 });
1202 let check2 = context.with_label("check2").spawn({
1203 let blob = blob.clone();
1204 move |_| async move {
1205 let read = blob
1206 .read_at(vec![0; data.len()], 0)
1207 .await
1208 .expect("Failed to read from blob");
1209 assert_eq!(read.as_ref(), data);
1210 }
1211 });
1212
1213 let result = join!(check1, check2);
1215 assert!(result.0.is_ok());
1216 assert!(result.1.is_ok());
1217
1218 let read = blob
1220 .read_at(vec![0; data.len()], 0)
1221 .await
1222 .expect("Failed to read from blob");
1223 assert_eq!(read.as_ref(), data);
1224
1225 drop(blob);
1227
1228 let buffer = context.encode();
1230 assert!(buffer.contains("open_blobs 0"));
1231 });
1232 }
1233
1234 fn test_shutdown<R: Runner>(runner: R)
1235 where
1236 R::Context: Spawner + Metrics + Clock,
1237 {
1238 let kill = 9;
1239 runner.start(|context| async move {
1240 let before = context
1242 .with_label("before")
1243 .spawn(move |context| async move {
1244 let mut signal = context.stopped();
1245 let value = (&mut signal).await.unwrap();
1246 assert_eq!(value, kill);
1247 drop(signal);
1248 });
1249
1250 let result = context.clone().stop(kill, None).await;
1252 assert!(result.is_ok());
1253
1254 let after = context
1256 .with_label("after")
1257 .spawn(move |context| async move {
1258 let value = context.stopped().await.unwrap();
1260 assert_eq!(value, kill);
1261 });
1262
1263 let result = join!(before, after);
1265 assert!(result.0.is_ok());
1266 assert!(result.1.is_ok());
1267 });
1268 }
1269
1270 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
1271 where
1272 R::Context: Spawner + Metrics + Clock,
1273 {
1274 let kill = 42;
1275 runner.start(|context| async move {
1276 let (started_tx, mut started_rx) = mpsc::channel(3);
1277 let counter = Arc::new(AtomicU32::new(0));
1278
1279 let task = |cleanup_duration: Duration| {
1282 let context = context.clone();
1283 let counter = counter.clone();
1284 let mut started_tx = started_tx.clone();
1285 context.spawn(move |context| async move {
1286 let mut signal = context.stopped();
1288 started_tx.send(()).await.unwrap();
1289
1290 let value = (&mut signal).await.unwrap();
1292 assert_eq!(value, kill);
1293 context.sleep(cleanup_duration).await;
1294 counter.fetch_add(1, Ordering::SeqCst);
1295
1296 drop(signal);
1298 })
1299 };
1300
1301 let task1 = task(Duration::from_millis(10));
1302 let task2 = task(Duration::from_millis(20));
1303 let task3 = task(Duration::from_millis(30));
1304
1305 for _ in 0..3 {
1307 started_rx.next().await.unwrap();
1308 }
1309
1310 context.stop(kill, None).await.unwrap();
1312 assert_eq!(counter.load(Ordering::SeqCst), 3);
1313
1314 let result = join!(task1, task2, task3);
1316 assert!(result.0.is_ok());
1317 assert!(result.1.is_ok());
1318 assert!(result.2.is_ok());
1319 });
1320 }
1321
1322 fn test_shutdown_timeout<R: Runner>(runner: R)
1323 where
1324 R::Context: Spawner + Metrics + Clock,
1325 {
1326 let kill = 42;
1327 runner.start(|context| async move {
1328 let (started_tx, started_rx) = oneshot::channel();
1330
1331 context.clone().spawn(move |context| async move {
1333 let signal = context.stopped();
1334 started_tx.send(()).unwrap();
1335 pending::<()>().await;
1336 signal.await.unwrap();
1337 });
1338
1339 started_rx.await.unwrap();
1341 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1342
1343 assert!(matches!(result, Err(Error::Timeout)));
1345 });
1346 }
1347
1348 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1349 where
1350 R::Context: Spawner + Metrics + Clock,
1351 {
1352 let kill1 = 42;
1353 let kill2 = 43;
1354
1355 runner.start(|context| async move {
1356 let (started_tx, started_rx) = oneshot::channel();
1357 let counter = Arc::new(AtomicU32::new(0));
1358
1359 let task = context.with_label("blocking_task").spawn({
1361 let counter = counter.clone();
1362 move |context| async move {
1363 let mut signal = context.stopped();
1365 started_tx.send(()).unwrap();
1366
1367 let value = (&mut signal).await.unwrap();
1369 assert_eq!(value, kill1);
1370 context.sleep(Duration::from_millis(50)).await;
1371
1372 counter.fetch_add(1, Ordering::SeqCst);
1374 drop(signal);
1375 }
1376 });
1377
1378 started_rx.await.unwrap();
1380
1381 let stop_task1 = context.clone().stop(kill1, None);
1384 pin_mut!(stop_task1);
1385 let stop_task2 = context.clone().stop(kill2, None);
1386 pin_mut!(stop_task2);
1387
1388 assert!(stop_task1.as_mut().now_or_never().is_none());
1390 assert!(stop_task2.as_mut().now_or_never().is_none());
1391
1392 assert!(stop_task1.await.is_ok());
1394 assert!(stop_task2.await.is_ok());
1395
1396 let sig = context.stopped().await;
1398 assert_eq!(sig.unwrap(), kill1);
1399
1400 let result = task.await;
1402 assert!(result.is_ok());
1403 assert_eq!(counter.load(Ordering::SeqCst), 1);
1404
1405 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1407 });
1408 }
1409
1410 fn test_unfulfilled_shutdown<R: Runner>(runner: R)
1411 where
1412 R::Context: Spawner + Metrics,
1413 {
1414 runner.start(|context| async move {
1415 context
1417 .with_label("before")
1418 .spawn(move |context| async move {
1419 let mut signal = context.stopped();
1420 let value = (&mut signal).await.unwrap();
1421
1422 assert_eq!(value, 42);
1424 drop(signal);
1425 });
1426
1427 reschedule().await;
1429 });
1430 }
1431
1432 fn test_spawn_dedicated<R: Runner>(runner: R)
1433 where
1434 R::Context: Spawner,
1435 {
1436 runner.start(|context| async move {
1437 let handle = context.dedicated().spawn(|_| async move { 42 });
1438 assert!(matches!(handle.await, Ok(42)));
1439 });
1440 }
1441
1442 fn test_spawn<R: Runner>(runner: R)
1443 where
1444 R::Context: Spawner + Clock,
1445 {
1446 runner.start(|context| async move {
1447 let child_handle = Arc::new(Mutex::new(None));
1448 let child_handle2 = child_handle.clone();
1449
1450 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1451 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1452 let parent_handle = context.spawn(move |context| async move {
1453 let handle = context.spawn(|_| async {});
1455
1456 *child_handle2.lock().unwrap() = Some(handle);
1458
1459 parent_initialized_tx.send(()).unwrap();
1460
1461 parent_complete_rx.await.unwrap();
1463 });
1464
1465 parent_initialized_rx.await.unwrap();
1467
1468 let child_handle = child_handle.lock().unwrap().take().unwrap();
1470 assert!(child_handle.await.is_ok());
1471
1472 parent_complete_tx.send(()).unwrap();
1474
1475 assert!(parent_handle.await.is_ok());
1477 });
1478 }
1479
1480 fn test_spawn_abort_on_parent_abort<R: Runner>(runner: R)
1481 where
1482 R::Context: Spawner + Clock,
1483 {
1484 runner.start(|context| async move {
1485 let child_handle = Arc::new(Mutex::new(None));
1486 let child_handle2 = child_handle.clone();
1487
1488 let (parent_initialized_tx, parent_initialized_rx) = oneshot::channel();
1489 let parent_handle = context.spawn(move |context| async move {
1490 let handle = context.spawn(|_| pending::<()>());
1492
1493 *child_handle2.lock().unwrap() = Some(handle);
1495
1496 parent_initialized_tx.send(()).unwrap();
1497
1498 pending::<()>().await
1500 });
1501
1502 parent_initialized_rx.await.unwrap();
1504
1505 parent_handle.abort();
1507 assert!(matches!(parent_handle.await, Err(Error::Closed)));
1508
1509 let child_handle = child_handle.lock().unwrap().take().unwrap();
1511 assert!(matches!(child_handle.await, Err(Error::Closed)));
1512 });
1513 }
1514
1515 fn test_spawn_abort_on_parent_completion<R: Runner>(runner: R)
1516 where
1517 R::Context: Spawner + Clock,
1518 {
1519 runner.start(|context| async move {
1520 let child_handle = Arc::new(Mutex::new(None));
1521 let child_handle2 = child_handle.clone();
1522
1523 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1524 let parent_handle = context.spawn(move |context| async move {
1525 let handle = context.spawn(|_| pending::<()>());
1527
1528 *child_handle2.lock().unwrap() = Some(handle);
1530
1531 parent_complete_rx.await.unwrap();
1533 });
1534
1535 parent_complete_tx.send(()).unwrap();
1537
1538 assert!(parent_handle.await.is_ok());
1540
1541 let child_handle = child_handle.lock().unwrap().take().unwrap();
1543 assert!(matches!(child_handle.await, Err(Error::Closed)));
1544 });
1545 }
1546
1547 fn test_spawn_cascading_abort<R: Runner>(runner: R)
1548 where
1549 R::Context: Spawner + Clock,
1550 {
1551 runner.start(|context| async move {
1552 let c0 = context.clone();
1562 let g0 = c0.clone();
1563 let g1 = c0.clone();
1564 let c1 = context.clone();
1565 let g2 = c1.clone();
1566 let g3 = c1.clone();
1567 let c2 = context.clone();
1568 let g4 = c2.clone();
1569 let g5 = c2.clone();
1570
1571 let handles = Arc::new(Mutex::new(Vec::new()));
1573 let (mut initialized_tx, mut initialized_rx) = mpsc::channel(9);
1574 let root_task = context.spawn({
1575 let handles = handles.clone();
1576 move |_| async move {
1577 for (context, grandchildren) in [(c0, [g0, g1]), (c1, [g2, g3]), (c2, [g4, g5])]
1578 {
1579 let handle = context.spawn({
1580 let handles = handles.clone();
1581 let mut initialized_tx = initialized_tx.clone();
1582 move |_| async move {
1583 for grandchild in grandchildren {
1584 let handle = grandchild.spawn(|_| async {
1585 pending::<()>().await;
1586 });
1587 handles.lock().unwrap().push(handle);
1588 initialized_tx.send(()).await.unwrap();
1589 }
1590
1591 pending::<()>().await;
1592 }
1593 });
1594 handles.lock().unwrap().push(handle);
1595 initialized_tx.send(()).await.unwrap();
1596 }
1597
1598 pending::<()>().await;
1599 }
1600 });
1601
1602 for _ in 0..9 {
1604 initialized_rx.next().await.unwrap();
1605 }
1606
1607 assert_eq!(handles.lock().unwrap().len(), 9);
1609
1610 root_task.abort();
1612 assert!(matches!(root_task.await, Err(Error::Closed)));
1613
1614 let handles = handles.lock().unwrap().drain(..).collect::<Vec<_>>();
1616 for handle in handles {
1617 assert!(matches!(handle.await, Err(Error::Closed)));
1618 }
1619 });
1620 }
1621
1622 fn test_child_survives_sibling_completion<R: Runner>(runner: R)
1623 where
1624 R::Context: Spawner + Clock,
1625 {
1626 runner.start(|context| async move {
1627 let (child_started_tx, child_started_rx) = oneshot::channel();
1628 let (child_complete_tx, child_complete_rx) = oneshot::channel();
1629 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1630 let (sibling_started_tx, sibling_started_rx) = oneshot::channel();
1631 let (sibling_complete_tx, sibling_complete_rx) = oneshot::channel();
1632 let (sibling_handle_tx, sibling_handle_rx) = oneshot::channel();
1633 let (parent_complete_tx, parent_complete_rx) = oneshot::channel();
1634
1635 let parent = context.spawn(move |context| async move {
1636 let child_handle = context.clone().spawn(|_| async move {
1638 child_started_tx.send(()).unwrap();
1639 child_complete_rx.await.unwrap();
1641 });
1642 assert!(
1643 child_handle_tx.send(child_handle).is_ok(),
1644 "child handle receiver dropped"
1645 );
1646
1647 let sibling_handle = context.clone().spawn(move |_| async move {
1649 sibling_started_tx.send(()).unwrap();
1650 sibling_complete_rx.await.unwrap();
1652 });
1653 assert!(
1654 sibling_handle_tx.send(sibling_handle).is_ok(),
1655 "sibling handle receiver dropped"
1656 );
1657
1658 parent_complete_rx.await.unwrap();
1660 });
1661
1662 child_started_rx.await.unwrap();
1664 sibling_started_rx.await.unwrap();
1665
1666 sibling_complete_tx.send(()).unwrap();
1668 assert!(sibling_handle_rx.await.is_ok());
1669
1670 child_complete_tx.send(()).unwrap();
1672 assert!(child_handle_rx.await.is_ok());
1673
1674 parent_complete_tx.send(()).unwrap();
1676 assert!(parent.await.is_ok());
1677 });
1678 }
1679
1680 fn test_spawn_clone_chain<R: Runner>(runner: R)
1681 where
1682 R::Context: Spawner + Clock,
1683 {
1684 runner.start(|context| async move {
1685 let (parent_started_tx, parent_started_rx) = oneshot::channel();
1686 let (child_started_tx, child_started_rx) = oneshot::channel();
1687 let (grandchild_started_tx, grandchild_started_rx) = oneshot::channel();
1688 let (child_handle_tx, child_handle_rx) = oneshot::channel();
1689 let (grandchild_handle_tx, grandchild_handle_rx) = oneshot::channel();
1690
1691 let parent = context.clone().spawn({
1692 move |context| async move {
1693 let child = context.clone().spawn({
1694 move |context| async move {
1695 let grandchild = context.clone().spawn({
1696 move |_| async move {
1697 grandchild_started_tx.send(()).unwrap();
1698 pending::<()>().await;
1699 }
1700 });
1701 assert!(
1702 grandchild_handle_tx.send(grandchild).is_ok(),
1703 "grandchild handle receiver dropped"
1704 );
1705 child_started_tx.send(()).unwrap();
1706 pending::<()>().await;
1707 }
1708 });
1709 assert!(
1710 child_handle_tx.send(child).is_ok(),
1711 "child handle receiver dropped"
1712 );
1713 parent_started_tx.send(()).unwrap();
1714 pending::<()>().await;
1715 }
1716 });
1717
1718 parent_started_rx.await.unwrap();
1719 child_started_rx.await.unwrap();
1720 grandchild_started_rx.await.unwrap();
1721
1722 let child_handle = child_handle_rx.await.unwrap();
1723 let grandchild_handle = grandchild_handle_rx.await.unwrap();
1724
1725 parent.abort();
1726 assert!(parent.await.is_err());
1727
1728 assert!(child_handle.await.is_err());
1729 assert!(grandchild_handle.await.is_err());
1730 });
1731 }
1732
1733 fn test_spawn_sparse_clone_chain<R: Runner>(runner: R)
1734 where
1735 R::Context: Spawner + Clock,
1736 {
1737 runner.start(|context| async move {
1738 let (leaf_started_tx, leaf_started_rx) = oneshot::channel();
1739 let (leaf_handle_tx, leaf_handle_rx) = oneshot::channel();
1740
1741 let parent = context.clone().spawn({
1742 move |context| async move {
1743 let clone1 = context.clone();
1744 let clone2 = clone1.clone();
1745 let clone3 = clone2.clone();
1746
1747 let leaf = clone3.clone().spawn({
1748 move |_| async move {
1749 leaf_started_tx.send(()).unwrap();
1750 pending::<()>().await;
1751 }
1752 });
1753
1754 leaf_handle_tx
1755 .send(leaf)
1756 .unwrap_or_else(|_| panic!("leaf handle receiver dropped"));
1757 pending::<()>().await;
1758 }
1759 });
1760
1761 leaf_started_rx.await.unwrap();
1762 let leaf_handle = leaf_handle_rx.await.unwrap();
1763
1764 parent.abort();
1765 assert!(parent.await.is_err());
1766 assert!(leaf_handle.await.is_err());
1767 });
1768 }
1769
1770 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1771 where
1772 R::Context: Spawner,
1773 {
1774 runner.start(|context| async move {
1775 let context = if dedicated {
1776 context.dedicated()
1777 } else {
1778 context.shared(true)
1779 };
1780
1781 let handle = context.spawn(|_| async move { 42 });
1782 let result = handle.await;
1783 assert!(matches!(result, Ok(42)));
1784 });
1785 }
1786
1787 fn test_spawn_blocking_panic<R: Runner>(runner: R, dedicated: bool)
1788 where
1789 R::Context: Spawner + Clock,
1790 {
1791 runner.start(|context| async move {
1792 let context = if dedicated {
1793 context.dedicated()
1794 } else {
1795 context.shared(true)
1796 };
1797
1798 context.clone().spawn(|_| async move {
1799 panic!("blocking task panicked");
1800 });
1801
1802 loop {
1804 context.sleep(Duration::from_millis(100)).await;
1805 }
1806 });
1807 }
1808
1809 fn test_spawn_blocking_panic_caught<R: Runner>(runner: R, dedicated: bool)
1810 where
1811 R::Context: Spawner + Clock,
1812 {
1813 let result: Result<(), Error> = runner.start(|context| async move {
1814 let context = if dedicated {
1815 context.dedicated()
1816 } else {
1817 context.shared(true)
1818 };
1819
1820 let handle = context.clone().spawn(|_| async move {
1821 panic!("blocking task panicked");
1822 });
1823 handle.await
1824 });
1825 assert!(matches!(result, Err(Error::Exited)));
1826 }
1827
1828 fn test_circular_reference_prevents_cleanup<R: Runner>(runner: R) {
1829 runner.start(|_| async move {
1830 let dropper = Arc::new(());
1832 let executor = deterministic::Runner::default();
1833 executor.start({
1834 let dropper = dropper.clone();
1835 move |context| async move {
1836 let (mut setup_tx, mut setup_rx) = mpsc::unbounded::<()>();
1838 let (mut tx1, mut rx1) = mpsc::unbounded::<()>();
1839 let (mut tx2, mut rx2) = mpsc::unbounded::<()>();
1840
1841 context.with_label("task1").spawn({
1843 let mut setup_tx = setup_tx.clone();
1844 let dropper = dropper.clone();
1845 move |_| async move {
1846 tx2.send(()).await.unwrap();
1848 rx1.next().await.unwrap();
1849 setup_tx.send(()).await.unwrap();
1850
1851 while rx1.next().await.is_some() {}
1853 drop(tx2);
1854 drop(dropper);
1855 }
1856 });
1857
1858 context.with_label("task2").spawn(move |_| async move {
1860 tx1.send(()).await.unwrap();
1862 rx2.next().await.unwrap();
1863 setup_tx.send(()).await.unwrap();
1864
1865 while rx2.next().await.is_some() {}
1867 drop(tx1);
1868 drop(dropper);
1869 });
1870
1871 setup_rx.next().await.unwrap();
1873 setup_rx.next().await.unwrap();
1874 }
1875 });
1876
1877 Arc::try_unwrap(dropper).expect("references remaining");
1879 });
1880 }
1881
1882 fn test_late_waker<R: Runner>(runner: R)
1883 where
1884 R::Context: Metrics + Spawner,
1885 {
1886 struct CaptureWaker {
1889 tx: Option<oneshot::Sender<Waker>>,
1890 sent: bool,
1891 }
1892 impl Future for CaptureWaker {
1893 type Output = ();
1894 fn poll(mut self: Pin<&mut Self>, cx: &mut TContext<'_>) -> Poll<Self::Output> {
1895 if !self.sent {
1896 if let Some(tx) = self.tx.take() {
1897 let _ = tx.send(cx.waker().clone());
1899 }
1900 self.sent = true;
1901 }
1902 Poll::Pending
1903 }
1904 }
1905
1906 struct WakeOnDrop(Option<Waker>);
1908 impl Drop for WakeOnDrop {
1909 fn drop(&mut self) {
1910 if let Some(w) = self.0.take() {
1911 w.wake_by_ref();
1912 }
1913 }
1914 }
1915
1916 let holder = runner.start(|context| async move {
1918 let (tx, rx) = oneshot::channel::<Waker>();
1920
1921 context
1923 .with_label("capture_waker")
1924 .spawn(move |_| async move {
1925 CaptureWaker {
1926 tx: Some(tx),
1927 sent: false,
1928 }
1929 .await;
1930 });
1931
1932 utils::reschedule().await;
1934
1935 let waker = rx.await.expect("waker not received");
1937
1938 WakeOnDrop(Some(waker))
1940 });
1941
1942 drop(holder);
1945 }
1946
1947 fn test_metrics<R: Runner>(runner: R)
1948 where
1949 R::Context: Metrics,
1950 {
1951 runner.start(|context| async move {
1952 assert_eq!(context.label(), "");
1954
1955 let counter = Counter::<u64>::default();
1957 context.register("test", "test", counter.clone());
1958
1959 counter.inc();
1961
1962 let buffer = context.encode();
1964 assert!(buffer.contains("test_total 1"));
1965
1966 let context = context.with_label("nested");
1968 let nested_counter = Counter::<u64>::default();
1969 context.register("test", "test", nested_counter.clone());
1970
1971 nested_counter.inc();
1973
1974 let buffer = context.encode();
1976 assert!(buffer.contains("nested_test_total 1"));
1977 assert!(buffer.contains("test_total 1"));
1978 });
1979 }
1980
1981 fn test_metrics_label<R: Runner>(runner: R)
1982 where
1983 R::Context: Metrics,
1984 {
1985 runner.start(|context| async move {
1986 context.with_label(METRICS_PREFIX);
1987 })
1988 }
1989
1990 fn test_metrics_label_empty<R: Runner>(runner: R)
1991 where
1992 R::Context: Metrics,
1993 {
1994 runner.start(|context| async move {
1995 context.with_label("");
1996 })
1997 }
1998
1999 fn test_metrics_label_invalid_first_char<R: Runner>(runner: R)
2000 where
2001 R::Context: Metrics,
2002 {
2003 runner.start(|context| async move {
2004 context.with_label("1invalid");
2005 })
2006 }
2007
2008 fn test_metrics_label_invalid_char<R: Runner>(runner: R)
2009 where
2010 R::Context: Metrics,
2011 {
2012 runner.start(|context| async move {
2013 context.with_label("invalid-label");
2014 })
2015 }
2016
2017 #[test]
2018 fn test_deterministic_future() {
2019 let runner = deterministic::Runner::default();
2020 test_error_future(runner);
2021 }
2022
2023 #[test]
2024 fn test_deterministic_clock_sleep() {
2025 let executor = deterministic::Runner::default();
2026 test_clock_sleep(executor);
2027 }
2028
2029 #[test]
2030 fn test_deterministic_clock_sleep_until() {
2031 let executor = deterministic::Runner::default();
2032 test_clock_sleep_until(executor);
2033 }
2034
2035 #[test]
2036 fn test_deterministic_clock_timeout() {
2037 let executor = deterministic::Runner::default();
2038 test_clock_timeout(executor);
2039 }
2040
2041 #[test]
2042 fn test_deterministic_root_finishes() {
2043 let executor = deterministic::Runner::default();
2044 test_root_finishes(executor);
2045 }
2046
2047 #[test]
2048 fn test_deterministic_spawn_after_abort() {
2049 let executor = deterministic::Runner::default();
2050 test_spawn_after_abort(executor);
2051 }
2052
2053 #[test]
2054 fn test_deterministic_spawn_abort() {
2055 let executor = deterministic::Runner::default();
2056 test_spawn_abort(executor, false, false);
2057 }
2058
2059 #[test]
2060 #[should_panic(expected = "blah")]
2061 fn test_deterministic_panic_aborts_root() {
2062 let runner = deterministic::Runner::default();
2063 test_panic_aborts_root(runner);
2064 }
2065
2066 #[test]
2067 #[should_panic(expected = "blah")]
2068 fn test_deterministic_panic_aborts_root_caught() {
2069 let cfg = deterministic::Config::default().with_catch_panics(true);
2070 let runner = deterministic::Runner::new(cfg);
2071 test_panic_aborts_root(runner);
2072 }
2073
2074 #[test]
2075 #[should_panic(expected = "blah")]
2076 fn test_deterministic_panic_aborts_spawn() {
2077 let executor = deterministic::Runner::default();
2078 test_panic_aborts_spawn(executor);
2079 }
2080
2081 #[test]
2082 fn test_deterministic_panic_aborts_spawn_caught() {
2083 let cfg = deterministic::Config::default().with_catch_panics(true);
2084 let executor = deterministic::Runner::new(cfg);
2085 test_panic_aborts_spawn_caught(executor);
2086 }
2087
2088 #[test]
2089 #[should_panic(expected = "boom")]
2090 fn test_deterministic_multiple_panics() {
2091 let executor = deterministic::Runner::default();
2092 test_multiple_panics(executor);
2093 }
2094
2095 #[test]
2096 fn test_deterministic_multiple_panics_caught() {
2097 let cfg = deterministic::Config::default().with_catch_panics(true);
2098 let executor = deterministic::Runner::new(cfg);
2099 test_multiple_panics_caught(executor);
2100 }
2101
2102 #[test]
2103 fn test_deterministic_select() {
2104 let executor = deterministic::Runner::default();
2105 test_select(executor);
2106 }
2107
2108 #[test]
2109 fn test_deterministic_select_loop() {
2110 let executor = deterministic::Runner::default();
2111 test_select_loop(executor);
2112 }
2113
2114 #[test]
2115 fn test_deterministic_storage_operations() {
2116 let executor = deterministic::Runner::default();
2117 test_storage_operations(executor);
2118 }
2119
2120 #[test]
2121 fn test_deterministic_blob_read_write() {
2122 let executor = deterministic::Runner::default();
2123 test_blob_read_write(executor);
2124 }
2125
2126 #[test]
2127 fn test_deterministic_blob_resize() {
2128 let executor = deterministic::Runner::default();
2129 test_blob_resize(executor);
2130 }
2131
2132 #[test]
2133 fn test_deterministic_many_partition_read_write() {
2134 let executor = deterministic::Runner::default();
2135 test_many_partition_read_write(executor);
2136 }
2137
2138 #[test]
2139 fn test_deterministic_blob_read_past_length() {
2140 let executor = deterministic::Runner::default();
2141 test_blob_read_past_length(executor);
2142 }
2143
2144 #[test]
2145 fn test_deterministic_blob_clone_and_concurrent_read() {
2146 let executor = deterministic::Runner::default();
2148 test_blob_clone_and_concurrent_read(executor);
2149 }
2150
2151 #[test]
2152 fn test_deterministic_shutdown() {
2153 let executor = deterministic::Runner::default();
2154 test_shutdown(executor);
2155 }
2156
2157 #[test]
2158 fn test_deterministic_shutdown_multiple_signals() {
2159 let executor = deterministic::Runner::default();
2160 test_shutdown_multiple_signals(executor);
2161 }
2162
2163 #[test]
2164 fn test_deterministic_shutdown_timeout() {
2165 let executor = deterministic::Runner::default();
2166 test_shutdown_timeout(executor);
2167 }
2168
2169 #[test]
2170 fn test_deterministic_shutdown_multiple_stop_calls() {
2171 let executor = deterministic::Runner::default();
2172 test_shutdown_multiple_stop_calls(executor);
2173 }
2174
2175 #[test]
2176 fn test_deterministic_unfulfilled_shutdown() {
2177 let executor = deterministic::Runner::default();
2178 test_unfulfilled_shutdown(executor);
2179 }
2180
2181 #[test]
2182 fn test_deterministic_spawn_dedicated() {
2183 let executor = deterministic::Runner::default();
2184 test_spawn_dedicated(executor);
2185 }
2186
2187 #[test]
2188 fn test_deterministic_spawn() {
2189 let runner = deterministic::Runner::default();
2190 test_spawn(runner);
2191 }
2192
2193 #[test]
2194 fn test_deterministic_spawn_abort_on_parent_abort() {
2195 let runner = deterministic::Runner::default();
2196 test_spawn_abort_on_parent_abort(runner);
2197 }
2198
2199 #[test]
2200 fn test_deterministic_spawn_abort_on_parent_completion() {
2201 let runner = deterministic::Runner::default();
2202 test_spawn_abort_on_parent_completion(runner);
2203 }
2204
2205 #[test]
2206 fn test_deterministic_spawn_cascading_abort() {
2207 let runner = deterministic::Runner::default();
2208 test_spawn_cascading_abort(runner);
2209 }
2210
2211 #[test]
2212 fn test_deterministic_child_survives_sibling_completion() {
2213 let runner = deterministic::Runner::default();
2214 test_child_survives_sibling_completion(runner);
2215 }
2216
2217 #[test]
2218 fn test_deterministic_spawn_clone_chain() {
2219 let runner = deterministic::Runner::default();
2220 test_spawn_clone_chain(runner);
2221 }
2222
2223 #[test]
2224 fn test_deterministic_spawn_sparse_clone_chain() {
2225 let runner = deterministic::Runner::default();
2226 test_spawn_sparse_clone_chain(runner);
2227 }
2228
2229 #[test]
2230 fn test_deterministic_spawn_blocking() {
2231 for dedicated in [false, true] {
2232 let executor = deterministic::Runner::default();
2233 test_spawn_blocking(executor, dedicated);
2234 }
2235 }
2236
2237 #[test]
2238 #[should_panic(expected = "blocking task panicked")]
2239 fn test_deterministic_spawn_blocking_panic() {
2240 for dedicated in [false, true] {
2241 let executor = deterministic::Runner::default();
2242 test_spawn_blocking_panic(executor, dedicated);
2243 }
2244 }
2245
2246 #[test]
2247 fn test_deterministic_spawn_blocking_panic_caught() {
2248 for dedicated in [false, true] {
2249 let cfg = deterministic::Config::default().with_catch_panics(true);
2250 let executor = deterministic::Runner::new(cfg);
2251 test_spawn_blocking_panic_caught(executor, dedicated);
2252 }
2253 }
2254
2255 #[test]
2256 fn test_deterministic_spawn_blocking_abort() {
2257 for (dedicated, blocking) in [(false, true), (true, false)] {
2258 let executor = deterministic::Runner::default();
2259 test_spawn_abort(executor, dedicated, blocking);
2260 }
2261 }
2262
2263 #[test]
2264 fn test_deterministic_circular_reference_prevents_cleanup() {
2265 let executor = deterministic::Runner::default();
2266 test_circular_reference_prevents_cleanup(executor);
2267 }
2268
2269 #[test]
2270 fn test_deterministic_late_waker() {
2271 let executor = deterministic::Runner::default();
2272 test_late_waker(executor);
2273 }
2274
2275 #[test]
2276 fn test_deterministic_metrics() {
2277 let executor = deterministic::Runner::default();
2278 test_metrics(executor);
2279 }
2280
2281 #[test]
2282 #[should_panic]
2283 fn test_deterministic_metrics_label() {
2284 let executor = deterministic::Runner::default();
2285 test_metrics_label(executor);
2286 }
2287
2288 #[test]
2289 #[should_panic(expected = "label must start with [a-zA-Z]")]
2290 fn test_deterministic_metrics_label_empty() {
2291 let executor = deterministic::Runner::default();
2292 test_metrics_label_empty(executor);
2293 }
2294
2295 #[test]
2296 #[should_panic(expected = "label must start with [a-zA-Z]")]
2297 fn test_deterministic_metrics_label_invalid_first_char() {
2298 let executor = deterministic::Runner::default();
2299 test_metrics_label_invalid_first_char(executor);
2300 }
2301
2302 #[test]
2303 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2304 fn test_deterministic_metrics_label_invalid_char() {
2305 let executor = deterministic::Runner::default();
2306 test_metrics_label_invalid_char(executor);
2307 }
2308
2309 #[test_collect_traces]
2310 fn test_deterministic_instrument_tasks(traces: TraceStorage) {
2311 let executor = deterministic::Runner::new(deterministic::Config::default());
2312 executor.start(|context| async move {
2313 context
2314 .with_label("test")
2315 .instrumented()
2316 .spawn(|context| async move {
2317 tracing::info!(field = "test field", "test log");
2318
2319 context
2320 .with_label("inner")
2321 .instrumented()
2322 .spawn(|_| async move {
2323 tracing::info!("inner log");
2324 })
2325 .await
2326 .unwrap();
2327 })
2328 .await
2329 .unwrap();
2330 });
2331
2332 let info_traces = traces.get_by_level(Level::INFO);
2333 assert_eq!(info_traces.len(), 2);
2334
2335 info_traces
2337 .expect_event_at_index(0, |event| {
2338 event.metadata.expect_content_exact("test log")?;
2339 event.metadata.expect_field_count(1)?;
2340 event.metadata.expect_field_exact("field", "test field")?;
2341 event.expect_span_count(1)?;
2342 event.expect_span_at_index(0, |span| {
2343 span.expect_content_exact("task")?;
2344 span.expect_field_count(1)?;
2345 span.expect_field_exact("name", "test")
2346 })
2347 })
2348 .unwrap();
2349
2350 info_traces
2351 .expect_event_at_index(1, |event| {
2352 event.metadata.expect_content_exact("inner log")?;
2353 event.metadata.expect_field_count(0)?;
2354 event.expect_span_count(1)?;
2355 event.expect_span_at_index(0, |span| {
2356 span.expect_content_exact("task")?;
2357 span.expect_field_count(1)?;
2358 span.expect_field_exact("name", "test_inner")
2359 })
2360 })
2361 .unwrap();
2362 }
2363
2364 #[test]
2365 fn test_deterministic_resolver() {
2366 let executor = deterministic::Runner::default();
2367 executor.start(|context| async move {
2368 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
2370 let ip2: IpAddr = "192.168.1.2".parse().unwrap();
2371 context.resolver_register("example.com", Some(vec![ip1, ip2]));
2372
2373 let addrs = context.resolve("example.com").await.unwrap();
2375 assert_eq!(addrs, vec![ip1, ip2]);
2376
2377 let result = context.resolve("unknown.com").await;
2379 assert!(matches!(result, Err(Error::ResolveFailed(_))));
2380
2381 context.resolver_register("example.com", None);
2383 let result = context.resolve("example.com").await;
2384 assert!(matches!(result, Err(Error::ResolveFailed(_))));
2385 });
2386 }
2387
2388 #[test]
2389 fn test_tokio_error_future() {
2390 let runner = tokio::Runner::default();
2391 test_error_future(runner);
2392 }
2393
2394 #[test]
2395 fn test_tokio_clock_sleep() {
2396 let executor = tokio::Runner::default();
2397 test_clock_sleep(executor);
2398 }
2399
2400 #[test]
2401 fn test_tokio_clock_sleep_until() {
2402 let executor = tokio::Runner::default();
2403 test_clock_sleep_until(executor);
2404 }
2405
2406 #[test]
2407 fn test_tokio_clock_timeout() {
2408 let executor = tokio::Runner::default();
2409 test_clock_timeout(executor);
2410 }
2411
2412 #[test]
2413 fn test_tokio_root_finishes() {
2414 let executor = tokio::Runner::default();
2415 test_root_finishes(executor);
2416 }
2417
2418 #[test]
2419 fn test_tokio_spawn_after_abort() {
2420 let executor = tokio::Runner::default();
2421 test_spawn_after_abort(executor);
2422 }
2423
2424 #[test]
2425 fn test_tokio_spawn_abort() {
2426 let executor = tokio::Runner::default();
2427 test_spawn_abort(executor, false, false);
2428 }
2429
2430 #[test]
2431 #[should_panic(expected = "blah")]
2432 fn test_tokio_panic_aborts_root() {
2433 let executor = tokio::Runner::default();
2434 test_panic_aborts_root(executor);
2435 }
2436
2437 #[test]
2438 #[should_panic(expected = "blah")]
2439 fn test_tokio_panic_aborts_root_caught() {
2440 let cfg = tokio::Config::default().with_catch_panics(true);
2441 let executor = tokio::Runner::new(cfg);
2442 test_panic_aborts_root(executor);
2443 }
2444
2445 #[test]
2446 #[should_panic(expected = "blah")]
2447 fn test_tokio_panic_aborts_spawn() {
2448 let executor = tokio::Runner::default();
2449 test_panic_aborts_spawn(executor);
2450 }
2451
2452 #[test]
2453 fn test_tokio_panic_aborts_spawn_caught() {
2454 let cfg = tokio::Config::default().with_catch_panics(true);
2455 let executor = tokio::Runner::new(cfg);
2456 test_panic_aborts_spawn_caught(executor);
2457 }
2458
2459 #[test]
2460 #[should_panic(expected = "boom")]
2461 fn test_tokio_multiple_panics() {
2462 let executor = tokio::Runner::default();
2463 test_multiple_panics(executor);
2464 }
2465
2466 #[test]
2467 fn test_tokio_multiple_panics_caught() {
2468 let cfg = tokio::Config::default().with_catch_panics(true);
2469 let executor = tokio::Runner::new(cfg);
2470 test_multiple_panics_caught(executor);
2471 }
2472
2473 #[test]
2474 fn test_tokio_select() {
2475 let executor = tokio::Runner::default();
2476 test_select(executor);
2477 }
2478
2479 #[test]
2480 fn test_tokio_select_loop() {
2481 let executor = tokio::Runner::default();
2482 test_select_loop(executor);
2483 }
2484
2485 #[test]
2486 fn test_tokio_storage_operations() {
2487 let executor = tokio::Runner::default();
2488 test_storage_operations(executor);
2489 }
2490
2491 #[test]
2492 fn test_tokio_blob_read_write() {
2493 let executor = tokio::Runner::default();
2494 test_blob_read_write(executor);
2495 }
2496
2497 #[test]
2498 fn test_tokio_blob_resize() {
2499 let executor = tokio::Runner::default();
2500 test_blob_resize(executor);
2501 }
2502
2503 #[test]
2504 fn test_tokio_many_partition_read_write() {
2505 let executor = tokio::Runner::default();
2506 test_many_partition_read_write(executor);
2507 }
2508
2509 #[test]
2510 fn test_tokio_blob_read_past_length() {
2511 let executor = tokio::Runner::default();
2512 test_blob_read_past_length(executor);
2513 }
2514
2515 #[test]
2516 fn test_tokio_blob_clone_and_concurrent_read() {
2517 let executor = tokio::Runner::default();
2519 test_blob_clone_and_concurrent_read(executor);
2520 }
2521
2522 #[test]
2523 fn test_tokio_shutdown() {
2524 let executor = tokio::Runner::default();
2525 test_shutdown(executor);
2526 }
2527
2528 #[test]
2529 fn test_tokio_shutdown_multiple_signals() {
2530 let executor = tokio::Runner::default();
2531 test_shutdown_multiple_signals(executor);
2532 }
2533
2534 #[test]
2535 fn test_tokio_shutdown_timeout() {
2536 let executor = tokio::Runner::default();
2537 test_shutdown_timeout(executor);
2538 }
2539
2540 #[test]
2541 fn test_tokio_shutdown_multiple_stop_calls() {
2542 let executor = tokio::Runner::default();
2543 test_shutdown_multiple_stop_calls(executor);
2544 }
2545
2546 #[test]
2547 fn test_tokio_unfulfilled_shutdown() {
2548 let executor = tokio::Runner::default();
2549 test_unfulfilled_shutdown(executor);
2550 }
2551
2552 #[test]
2553 fn test_tokio_spawn_dedicated() {
2554 let executor = tokio::Runner::default();
2555 test_spawn_dedicated(executor);
2556 }
2557
2558 #[test]
2559 fn test_tokio_spawn() {
2560 let runner = tokio::Runner::default();
2561 test_spawn(runner);
2562 }
2563
2564 #[test]
2565 fn test_tokio_spawn_abort_on_parent_abort() {
2566 let runner = tokio::Runner::default();
2567 test_spawn_abort_on_parent_abort(runner);
2568 }
2569
2570 #[test]
2571 fn test_tokio_spawn_abort_on_parent_completion() {
2572 let runner = tokio::Runner::default();
2573 test_spawn_abort_on_parent_completion(runner);
2574 }
2575
2576 #[test]
2577 fn test_tokio_spawn_cascading_abort() {
2578 let runner = tokio::Runner::default();
2579 test_spawn_cascading_abort(runner);
2580 }
2581
2582 #[test]
2583 fn test_tokio_child_survives_sibling_completion() {
2584 let runner = tokio::Runner::default();
2585 test_child_survives_sibling_completion(runner);
2586 }
2587
2588 #[test]
2589 fn test_tokio_spawn_clone_chain() {
2590 let runner = tokio::Runner::default();
2591 test_spawn_clone_chain(runner);
2592 }
2593
2594 #[test]
2595 fn test_tokio_spawn_sparse_clone_chain() {
2596 let runner = tokio::Runner::default();
2597 test_spawn_sparse_clone_chain(runner);
2598 }
2599
2600 #[test]
2601 fn test_tokio_spawn_blocking() {
2602 for dedicated in [false, true] {
2603 let executor = tokio::Runner::default();
2604 test_spawn_blocking(executor, dedicated);
2605 }
2606 }
2607
2608 #[test]
2609 #[should_panic(expected = "blocking task panicked")]
2610 fn test_tokio_spawn_blocking_panic() {
2611 for dedicated in [false, true] {
2612 let executor = tokio::Runner::default();
2613 test_spawn_blocking_panic(executor, dedicated);
2614 }
2615 }
2616
2617 #[test]
2618 fn test_tokio_spawn_blocking_panic_caught() {
2619 for dedicated in [false, true] {
2620 let cfg = tokio::Config::default().with_catch_panics(true);
2621 let executor = tokio::Runner::new(cfg);
2622 test_spawn_blocking_panic_caught(executor, dedicated);
2623 }
2624 }
2625
2626 #[test]
2627 fn test_tokio_spawn_blocking_abort() {
2628 for (dedicated, blocking) in [(false, true), (true, false)] {
2629 let executor = tokio::Runner::default();
2630 test_spawn_abort(executor, dedicated, blocking);
2631 }
2632 }
2633
2634 #[test]
2635 fn test_tokio_circular_reference_prevents_cleanup() {
2636 let executor = tokio::Runner::default();
2637 test_circular_reference_prevents_cleanup(executor);
2638 }
2639
2640 #[test]
2641 fn test_tokio_late_waker() {
2642 let executor = tokio::Runner::default();
2643 test_late_waker(executor);
2644 }
2645
2646 #[test]
2647 fn test_tokio_metrics() {
2648 let executor = tokio::Runner::default();
2649 test_metrics(executor);
2650 }
2651
2652 #[test]
2653 #[should_panic]
2654 fn test_tokio_metrics_label() {
2655 let executor = tokio::Runner::default();
2656 test_metrics_label(executor);
2657 }
2658
2659 #[test]
2660 #[should_panic(expected = "label must start with [a-zA-Z]")]
2661 fn test_tokio_metrics_label_empty() {
2662 let executor = tokio::Runner::default();
2663 test_metrics_label_empty(executor);
2664 }
2665
2666 #[test]
2667 #[should_panic(expected = "label must start with [a-zA-Z]")]
2668 fn test_tokio_metrics_label_invalid_first_char() {
2669 let executor = tokio::Runner::default();
2670 test_metrics_label_invalid_first_char(executor);
2671 }
2672
2673 #[test]
2674 #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2675 fn test_tokio_metrics_label_invalid_char() {
2676 let executor = tokio::Runner::default();
2677 test_metrics_label_invalid_char(executor);
2678 }
2679
2680 #[test]
2681 fn test_tokio_process_rss_metric() {
2682 let executor = tokio::Runner::default();
2683 executor.start(|context| async move {
2684 loop {
2685 let metrics = context.encode();
2687 if !metrics.contains("runtime_process_rss") {
2688 context.sleep(Duration::from_millis(100)).await;
2689 continue;
2690 }
2691
2692 for line in metrics.lines() {
2694 if line.starts_with("runtime_process_rss")
2695 && !line.starts_with("runtime_process_rss{")
2696 {
2697 let parts: Vec<&str> = line.split_whitespace().collect();
2698 if parts.len() >= 2 {
2699 let rss_value: i64 =
2700 parts[1].parse().expect("Failed to parse RSS value");
2701 if rss_value > 0 {
2702 return;
2703 }
2704 }
2705 }
2706 }
2707 }
2708 });
2709 }
2710
2711 #[test]
2712 fn test_tokio_telemetry() {
2713 let executor = tokio::Runner::default();
2714 executor.start(|context| async move {
2715 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
2717
2718 tokio::telemetry::init(
2720 context.with_label("metrics"),
2721 tokio::telemetry::Logging {
2722 level: Level::INFO,
2723 json: false,
2724 },
2725 Some(address),
2726 None,
2727 );
2728
2729 let counter: Counter<u64> = Counter::default();
2731 context.register("test_counter", "Test counter", counter.clone());
2732 counter.inc();
2733
2734 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
2736 let mut line = Vec::new();
2737 loop {
2738 let byte = stream.recv(vec![0; 1]).await?;
2739 if byte[0] == b'\n' {
2740 if line.last() == Some(&b'\r') {
2741 line.pop(); }
2743 break;
2744 }
2745 line.push(byte[0]);
2746 }
2747 String::from_utf8(line).map_err(|_| Error::ReadFailed)
2748 }
2749
2750 async fn read_headers<St: Stream>(
2751 stream: &mut St,
2752 ) -> Result<HashMap<String, String>, Error> {
2753 let mut headers = HashMap::new();
2754 loop {
2755 let line = read_line(stream).await?;
2756 if line.is_empty() {
2757 break;
2758 }
2759 let parts: Vec<&str> = line.splitn(2, ": ").collect();
2760 if parts.len() == 2 {
2761 headers.insert(parts[0].to_string(), parts[1].to_string());
2762 }
2763 }
2764 Ok(headers)
2765 }
2766
2767 async fn read_body<St: Stream>(
2768 stream: &mut St,
2769 content_length: usize,
2770 ) -> Result<String, Error> {
2771 let read = stream.recv(vec![0; content_length]).await?;
2772 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
2773 }
2774
2775 let client_handle = context
2777 .with_label("client")
2778 .spawn(move |context| async move {
2779 let (mut sink, mut stream) = loop {
2780 match context.dial(address).await {
2781 Ok((sink, stream)) => break (sink, stream),
2782 Err(e) => {
2783 error!(err =?e, "failed to connect");
2785 context.sleep(Duration::from_millis(10)).await;
2786 }
2787 }
2788 };
2789
2790 let request = format!(
2792 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
2793 );
2794 sink.send(Bytes::from(request).to_vec()).await.unwrap();
2795
2796 let status_line = read_line(&mut stream).await.unwrap();
2798 assert_eq!(status_line, "HTTP/1.1 200 OK");
2799
2800 let headers = read_headers(&mut stream).await.unwrap();
2802 println!("Headers: {headers:?}");
2803 let content_length = headers
2804 .get("content-length")
2805 .unwrap()
2806 .parse::<usize>()
2807 .unwrap();
2808
2809 let body = read_body(&mut stream, content_length).await.unwrap();
2811 assert!(body.contains("test_counter_total 1"));
2812 });
2813
2814 client_handle.await.unwrap();
2816 });
2817 }
2818
2819 #[test]
2820 fn test_tokio_resolver() {
2821 let executor = tokio::Runner::default();
2822 executor.start(|context| async move {
2823 let addrs = context.resolve("localhost").await.unwrap();
2824 assert!(!addrs.is_empty());
2825 for addr in addrs {
2826 assert!(
2827 addr == IpAddr::V4(Ipv4Addr::LOCALHOST)
2828 || addr == IpAddr::V6(Ipv6Addr::LOCALHOST)
2829 );
2830 }
2831 });
2832 }
2833}