1use commonware_utils::{StableBuf, StableBufMut};
21use prometheus_client::registry::Metric;
22use std::io::Error as IoError;
23use std::{
24 future::Future,
25 net::SocketAddr,
26 time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36 if #[cfg(not(target_arch = "wasm32"))] {
37 pub mod tokio;
38 pub mod benchmarks;
39 }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49const METRICS_PREFIX: &str = "runtime";
51
52#[derive(Error, Debug)]
54pub enum Error {
55 #[error("exited")]
56 Exited,
57 #[error("closed")]
58 Closed,
59 #[error("timeout")]
60 Timeout,
61 #[error("bind failed")]
62 BindFailed,
63 #[error("connection failed")]
64 ConnectionFailed,
65 #[error("write failed")]
66 WriteFailed,
67 #[error("read failed")]
68 ReadFailed,
69 #[error("send failed")]
70 SendFailed,
71 #[error("recv failed")]
72 RecvFailed,
73 #[error("partition creation failed: {0}")]
74 PartitionCreationFailed(String),
75 #[error("partition missing: {0}")]
76 PartitionMissing(String),
77 #[error("partition corrupt: {0}")]
78 PartitionCorrupt(String),
79 #[error("blob open failed: {0}/{1} error: {2}")]
80 BlobOpenFailed(String, String, IoError),
81 #[error("blob missing: {0}/{1}")]
82 BlobMissing(String, String),
83 #[error("blob truncate failed: {0}/{1} error: {2}")]
84 BlobTruncateFailed(String, String, IoError),
85 #[error("blob sync failed: {0}/{1} error: {2}")]
86 BlobSyncFailed(String, String, IoError),
87 #[error("blob close failed: {0}/{1} error: {2}")]
88 BlobCloseFailed(String, String, IoError),
89 #[error("blob insufficient length")]
90 BlobInsufficientLength,
91 #[error("offset overflow")]
92 OffsetOverflow,
93}
94
95pub trait Runner {
98 type Context;
104
105 fn start<F, Fut>(self, f: F) -> Fut::Output
107 where
108 F: FnOnce(Self::Context) -> Fut,
109 Fut: Future;
110}
111
112pub trait Spawner: Clone + Send + Sync + 'static {
114 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122 where
123 F: FnOnce(Self) -> Fut + Send + 'static,
124 Fut: Future<Output = T> + Send + 'static,
125 T: Send + 'static;
126
127 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136 where
137 F: Future<Output = T> + Send + 'static,
138 T: Send + 'static;
139
140 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159 where
160 F: FnOnce(Self) -> T + Send + 'static,
161 T: Send + 'static;
162
163 fn spawn_blocking_ref<F, T>(
172 &mut self,
173 dedicated: bool,
174 ) -> impl FnOnce(F) -> Handle<T> + 'static
175 where
176 F: FnOnce() -> T + Send + 'static,
177 T: Send + 'static;
178
179 fn stop(&self, value: i32);
186
187 fn stopped(&self) -> Signal;
192}
193
194pub trait Metrics: Clone + Send + Sync + 'static {
196 fn label(&self) -> String;
198
199 fn with_label(&self, label: &str) -> Self;
207
208 fn scoped_label(&self, label: &str) -> String {
212 let label = if self.label().is_empty() {
213 label.to_string()
214 } else {
215 format!("{}_{}", self.label(), label)
216 };
217 assert!(
218 !label.starts_with(METRICS_PREFIX),
219 "using runtime label is not allowed"
220 );
221 label
222 }
223
224 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
228
229 fn encode(&self) -> String;
231}
232
233pub trait Clock: Clone + Send + Sync + 'static {
239 fn current(&self) -> SystemTime;
241
242 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
244
245 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
247}
248
249pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
251
252pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
254
255pub type ListenerOf<N> = <N as crate::Network>::Listener;
257
258pub trait Network: Clone + Send + Sync + 'static {
261 type Listener: Listener;
265
266 fn bind(
268 &self,
269 socket: SocketAddr,
270 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
271
272 fn dial(
274 &self,
275 socket: SocketAddr,
276 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
277}
278
279pub trait Listener: Sync + Send + 'static {
282 type Sink: Sink;
285 type Stream: Stream;
288
289 fn accept(
291 &mut self,
292 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
293
294 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
296}
297
298pub trait Sink: Sync + Send + 'static {
301 fn send<B: StableBuf>(&mut self, msg: B) -> impl Future<Output = Result<(), Error>> + Send;
303}
304
305pub trait Stream: Sync + Send + 'static {
308 fn recv<B: StableBufMut>(&mut self, buf: B) -> impl Future<Output = Result<B, Error>> + Send;
311}
312
313pub trait Storage: Clone + Send + Sync + 'static {
321 type Blob: Blob;
323
324 fn open(
330 &self,
331 partition: &str,
332 name: &[u8],
333 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
334
335 fn remove(
339 &self,
340 partition: &str,
341 name: Option<&[u8]>,
342 ) -> impl Future<Output = Result<(), Error>> + Send;
343
344 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
346}
347
348#[allow(clippy::len_without_is_empty)]
359pub trait Blob: Clone + Send + Sync + 'static {
360 fn read_at<B: StableBufMut>(
365 &self,
366 buf: B,
367 offset: u64,
368 ) -> impl Future<Output = Result<B, Error>> + Send;
369
370 fn write_at<B: StableBuf>(
372 &self,
373 buf: B,
374 offset: u64,
375 ) -> impl Future<Output = Result<(), Error>> + Send;
376
377 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
379
380 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
382
383 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use bytes::Bytes;
391 use commonware_macros::select;
392 use futures::channel::oneshot;
393 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
394 use prometheus_client::metrics::counter::Counter;
395 use std::collections::HashMap;
396 use std::panic::{catch_unwind, AssertUnwindSafe};
397 use std::str::FromStr;
398 use std::sync::Mutex;
399 use tracing::{error, Level};
400 use utils::reschedule;
401
402 fn test_error_future<R: Runner>(runner: R) {
403 async fn error_future() -> Result<&'static str, &'static str> {
404 Err("An error occurred")
405 }
406 let result = runner.start(|_| error_future());
407 assert_eq!(result, Err("An error occurred"));
408 }
409
410 fn test_clock_sleep<R: Runner>(runner: R)
411 where
412 R::Context: Spawner + Clock,
413 {
414 runner.start(|context| async move {
415 let start = context.current();
417 let sleep_duration = Duration::from_millis(10);
418 context.sleep(sleep_duration).await;
419
420 let end = context.current();
422 assert!(end.duration_since(start).unwrap() >= sleep_duration);
423 });
424 }
425
426 fn test_clock_sleep_until<R: Runner>(runner: R)
427 where
428 R::Context: Spawner + Clock,
429 {
430 runner.start(|context| async move {
431 let now = context.current();
433 context.sleep_until(now + Duration::from_millis(100)).await;
434
435 let elapsed = now.elapsed().unwrap();
437 assert!(elapsed >= Duration::from_millis(100));
438 });
439 }
440
441 fn test_root_finishes<R: Runner>(runner: R)
442 where
443 R::Context: Spawner,
444 {
445 runner.start(|context| async move {
446 context.spawn(|_| async move {
447 loop {
448 reschedule().await;
449 }
450 });
451 });
452 }
453
454 fn test_spawn_abort<R: Runner>(runner: R)
455 where
456 R::Context: Spawner,
457 {
458 runner.start(|context| async move {
459 let handle = context.spawn(|_| async move {
460 loop {
461 reschedule().await;
462 }
463 });
464 handle.abort();
465 assert!(matches!(handle.await, Err(Error::Closed)));
466 });
467 }
468
469 fn test_panic_aborts_root<R: Runner>(runner: R) {
470 let result = catch_unwind(AssertUnwindSafe(|| {
471 runner.start(|_| async move {
472 panic!("blah");
473 });
474 }));
475 result.unwrap_err();
476 }
477
478 fn test_panic_aborts_spawn<R: Runner>(runner: R)
479 where
480 R::Context: Spawner,
481 {
482 let result = runner.start(|context| async move {
483 let result = context.spawn(|_| async move {
484 panic!("blah");
485 });
486 assert!(matches!(result.await, Err(Error::Exited)));
487 Result::<(), Error>::Ok(())
488 });
489
490 result.unwrap();
492 }
493
494 fn test_select<R: Runner>(runner: R) {
495 runner.start(|_| async move {
496 let output = Mutex::new(0);
498 select! {
499 v1 = ready(1) => {
500 *output.lock().unwrap() = v1;
501 },
502 v2 = ready(2) => {
503 *output.lock().unwrap() = v2;
504 },
505 };
506 assert_eq!(*output.lock().unwrap(), 1);
507
508 select! {
510 v1 = std::future::pending::<i32>() => {
511 *output.lock().unwrap() = v1;
512 },
513 v2 = ready(2) => {
514 *output.lock().unwrap() = v2;
515 },
516 };
517 assert_eq!(*output.lock().unwrap(), 2);
518 });
519 }
520
521 fn test_select_loop<R: Runner>(runner: R)
523 where
524 R::Context: Clock,
525 {
526 runner.start(|context| async move {
527 let (mut sender, mut receiver) = mpsc::unbounded();
529 for _ in 0..2 {
530 select! {
531 v = receiver.next() => {
532 panic!("unexpected value: {:?}", v);
533 },
534 _ = context.sleep(Duration::from_millis(100)) => {
535 continue;
536 },
537 };
538 }
539
540 sender.send(0).await.unwrap();
542 sender.send(1).await.unwrap();
543
544 select! {
546 _ = async {} => {
547 },
549 v = receiver.next() => {
550 panic!("unexpected value: {:?}", v);
551 },
552 };
553
554 for i in 0..2 {
556 select! {
557 _ = context.sleep(Duration::from_millis(100)) => {
558 panic!("timeout");
559 },
560 v = receiver.next() => {
561 assert_eq!(v.unwrap(), i);
562 },
563 };
564 }
565 });
566 }
567
568 fn test_storage_operations<R: Runner>(runner: R)
569 where
570 R::Context: Storage,
571 {
572 runner.start(|context| async move {
573 let partition = "test_partition";
574 let name = b"test_blob";
575
576 let (blob, _) = context
578 .open(partition, name)
579 .await
580 .expect("Failed to open blob");
581
582 let data = b"Hello, Storage!";
584 blob.write_at(Vec::from(data), 0)
585 .await
586 .expect("Failed to write to blob");
587
588 blob.sync().await.expect("Failed to sync blob");
590
591 let read = blob
593 .read_at(vec![0; data.len()], 0)
594 .await
595 .expect("Failed to read from blob");
596 assert_eq!(read, data);
597
598 blob.close().await.expect("Failed to close blob");
600
601 let blobs = context
603 .scan(partition)
604 .await
605 .expect("Failed to scan partition");
606 assert!(blobs.contains(&name.to_vec()));
607
608 let (blob, len) = context
610 .open(partition, name)
611 .await
612 .expect("Failed to reopen blob");
613 assert_eq!(len, data.len() as u64);
614
615 let read = blob
617 .read_at(vec![0u8; 7], 7)
618 .await
619 .expect("Failed to read data");
620 assert_eq!(read, b"Storage");
621
622 blob.close().await.expect("Failed to close blob");
624
625 context
627 .remove(partition, Some(name))
628 .await
629 .expect("Failed to remove blob");
630
631 let blobs = context
633 .scan(partition)
634 .await
635 .expect("Failed to scan partition");
636 assert!(!blobs.contains(&name.to_vec()));
637
638 context
640 .remove(partition, None)
641 .await
642 .expect("Failed to remove partition");
643
644 let result = context.scan(partition).await;
646 assert!(matches!(result, Err(Error::PartitionMissing(_))));
647 });
648 }
649
650 fn test_blob_read_write<R: Runner>(runner: R)
651 where
652 R::Context: Storage,
653 {
654 runner.start(|context| async move {
655 let partition = "test_partition";
656 let name = b"test_blob_rw";
657
658 let (blob, _) = context
660 .open(partition, name)
661 .await
662 .expect("Failed to open blob");
663
664 let data1 = b"Hello";
666 let data2 = b"World";
667 blob.write_at(Vec::from(data1), 0)
668 .await
669 .expect("Failed to write data1");
670 blob.write_at(Vec::from(data2), 5)
671 .await
672 .expect("Failed to write data2");
673
674 let read = blob
676 .read_at(vec![0u8; 10], 0)
677 .await
678 .expect("Failed to read data");
679 assert_eq!(&read[..5], data1);
680 assert_eq!(&read[5..], data2);
681
682 let data3 = b"Store";
684 blob.write_at(Vec::from(data3), 5)
685 .await
686 .expect("Failed to write data3");
687
688 blob.truncate(5).await.expect("Failed to truncate blob");
690 let read = blob
691 .read_at(vec![0; 5], 0)
692 .await
693 .expect("Failed to read data");
694 assert_eq!(&read[..5], data1);
695
696 let result = blob.read_at(vec![0u8; 10], 0).await;
698 assert!(result.is_err());
699
700 blob.close().await.expect("Failed to close blob");
702 });
703 }
704
705 fn test_many_partition_read_write<R: Runner>(runner: R)
706 where
707 R::Context: Storage,
708 {
709 runner.start(|context| async move {
710 let partitions = ["partition1", "partition2", "partition3"];
711 let name = b"test_blob_rw";
712 let data1 = b"Hello";
713 let data2 = b"World";
714
715 for (additional, partition) in partitions.iter().enumerate() {
716 let (blob, _) = context
718 .open(partition, name)
719 .await
720 .expect("Failed to open blob");
721
722 blob.write_at(Vec::from(data1), 0)
724 .await
725 .expect("Failed to write data1");
726 blob.write_at(Vec::from(data2), 5 + additional as u64)
727 .await
728 .expect("Failed to write data2");
729
730 blob.close().await.expect("Failed to close blob");
732 }
733
734 for (additional, partition) in partitions.iter().enumerate() {
735 let (blob, len) = context
737 .open(partition, name)
738 .await
739 .expect("Failed to open blob");
740 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
741
742 let read = blob
744 .read_at(vec![0u8; 10 + additional], 0)
745 .await
746 .expect("Failed to read data");
747 assert_eq!(&read[..5], b"Hello");
748 assert_eq!(&read[5 + additional..], b"World");
749
750 blob.close().await.expect("Failed to close blob");
752 }
753 });
754 }
755
756 fn test_blob_read_past_length<R: Runner>(runner: R)
757 where
758 R::Context: Storage,
759 {
760 runner.start(|context| async move {
761 let partition = "test_partition";
762 let name = b"test_blob_rw";
763
764 let (blob, _) = context
766 .open(partition, name)
767 .await
768 .expect("Failed to open blob");
769
770 let result = blob.read_at(vec![0u8; 10], 0).await;
772 assert!(result.is_err());
773
774 let data = b"Hello, Storage!".to_vec();
776 blob.write_at(data, 0)
777 .await
778 .expect("Failed to write to blob");
779
780 let result = blob.read_at(vec![0u8; 20], 0).await;
782 assert!(result.is_err());
783 })
784 }
785
786 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
787 where
788 R::Context: Spawner + Storage + Metrics,
789 {
790 runner.start(|context| async move {
791 let partition = "test_partition";
792 let name = b"test_blob_rw";
793
794 let (blob, _) = context
796 .open(partition, name)
797 .await
798 .expect("Failed to open blob");
799
800 let data = b"Hello, Storage!";
802 blob.write_at(Vec::from(data), 0)
803 .await
804 .expect("Failed to write to blob");
805
806 blob.sync().await.expect("Failed to sync blob");
808
809 let check1 = context.with_label("check1").spawn({
811 let blob = blob.clone();
812 move |_| async move {
813 let read = blob
814 .read_at(vec![0u8; data.len()], 0)
815 .await
816 .expect("Failed to read from blob");
817 assert_eq!(read, data);
818 }
819 });
820 let check2 = context.with_label("check2").spawn({
821 let blob = blob.clone();
822 move |_| async move {
823 let read = blob
824 .read_at(vec![0; data.len()], 0)
825 .await
826 .expect("Failed to read from blob");
827 assert_eq!(read, data);
828 }
829 });
830
831 let result = join!(check1, check2);
833 assert!(result.0.is_ok());
834 assert!(result.1.is_ok());
835
836 let read = blob
838 .read_at(vec![0; data.len()], 0)
839 .await
840 .expect("Failed to read from blob");
841 assert_eq!(read, data);
842
843 blob.close().await.expect("Failed to close blob");
845
846 let buffer = context.encode();
848 assert!(buffer.contains("open_blobs 0"));
849 });
850 }
851
852 fn test_shutdown<R: Runner>(runner: R)
853 where
854 R::Context: Spawner + Metrics + Clock,
855 {
856 let kill = 9;
857 runner.start(|context| async move {
858 let before = context
860 .with_label("before")
861 .spawn(move |context| async move {
862 let sig = context.stopped().await;
863 assert_eq!(sig.unwrap(), kill);
864 });
865
866 let after = context
868 .with_label("after")
869 .spawn(move |context| async move {
870 let mut signal = context.stopped();
872 loop {
873 select! {
874 sig = &mut signal => {
875 assert_eq!(sig.unwrap(), kill);
877 break;
878 },
879 _ = context.sleep(Duration::from_millis(10)) => {
880 },
882 }
883 }
884 });
885
886 context.sleep(Duration::from_millis(50)).await;
888
889 context.stop(kill);
891
892 let result = join!(before, after);
894 assert!(result.0.is_ok());
895 assert!(result.1.is_ok());
896 });
897 }
898
899 fn test_spawn_ref<R: Runner>(runner: R)
900 where
901 R::Context: Spawner,
902 {
903 runner.start(|mut context| async move {
904 let handle = context.spawn_ref();
905 let result = handle(async move { 42 }).await;
906 assert!(matches!(result, Ok(42)));
907 });
908 }
909
910 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
911 where
912 R::Context: Spawner,
913 {
914 runner.start(|mut context| async move {
915 let handle = context.spawn_ref();
916 let result = handle(async move { 42 }).await;
917 assert!(matches!(result, Ok(42)));
918
919 let handle = context.spawn_ref();
921 let result = handle(async move { 42 }).await;
922 assert!(matches!(result, Ok(42)));
923 });
924 }
925
926 fn test_spawn_duplicate<R: Runner>(runner: R)
927 where
928 R::Context: Spawner,
929 {
930 runner.start(|mut context| async move {
931 let handle = context.spawn_ref();
932 let result = handle(async move { 42 }).await;
933 assert!(matches!(result, Ok(42)));
934
935 context.spawn(|_| async move { 42 });
937 });
938 }
939
940 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
941 where
942 R::Context: Spawner,
943 {
944 runner.start(|context| async move {
945 let handle = context.spawn_blocking(dedicated, |_| 42);
946 let result = handle.await;
947 assert!(matches!(result, Ok(42)));
948 });
949 }
950
951 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
952 where
953 R::Context: Spawner,
954 {
955 runner.start(|mut context| async move {
956 let spawn = context.spawn_blocking_ref(dedicated);
957 let handle = spawn(|| 42);
958 let result = handle.await;
959 assert!(matches!(result, Ok(42)));
960 });
961 }
962
963 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
964 where
965 R::Context: Spawner,
966 {
967 runner.start(|mut context| async move {
968 let spawn = context.spawn_blocking_ref(dedicated);
969 let result = spawn(|| 42).await;
970 assert!(matches!(result, Ok(42)));
971
972 context.spawn_blocking(dedicated, |_| 42);
974 });
975 }
976
977 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
978 where
979 R::Context: Spawner,
980 {
981 runner.start(|context| async move {
982 let (sender, mut receiver) = oneshot::channel();
984 let handle = context.spawn_blocking(dedicated, move |_| {
985 loop {
987 if receiver.try_recv().is_ok() {
988 break;
989 }
990 }
991
992 let mut count = 0;
994 loop {
995 count += 1;
996 if count >= 100_000_000 {
997 break;
998 }
999 }
1000 count
1001 });
1002
1003 handle.abort();
1009 sender.send(()).unwrap();
1010
1011 assert!(matches!(handle.await, Ok(100_000_000)));
1013 });
1014 }
1015
1016 fn test_metrics<R: Runner>(runner: R)
1017 where
1018 R::Context: Metrics,
1019 {
1020 runner.start(|context| async move {
1021 assert_eq!(context.label(), "");
1023
1024 let counter = Counter::<u64>::default();
1026 context.register("test", "test", counter.clone());
1027
1028 counter.inc();
1030
1031 let buffer = context.encode();
1033 assert!(buffer.contains("test_total 1"));
1034
1035 let context = context.with_label("nested");
1037 let nested_counter = Counter::<u64>::default();
1038 context.register("test", "test", nested_counter.clone());
1039
1040 nested_counter.inc();
1042
1043 let buffer = context.encode();
1045 assert!(buffer.contains("nested_test_total 1"));
1046 assert!(buffer.contains("test_total 1"));
1047 });
1048 }
1049
1050 fn test_metrics_label<R: Runner>(runner: R)
1051 where
1052 R::Context: Metrics,
1053 {
1054 runner.start(|context| async move {
1055 context.with_label(METRICS_PREFIX);
1056 })
1057 }
1058
1059 #[test]
1060 fn test_deterministic_future() {
1061 let runner = deterministic::Runner::default();
1062 test_error_future(runner);
1063 }
1064
1065 #[test]
1066 fn test_deterministic_clock_sleep() {
1067 let executor = deterministic::Runner::default();
1068 test_clock_sleep(executor);
1069 }
1070
1071 #[test]
1072 fn test_deterministic_clock_sleep_until() {
1073 let executor = deterministic::Runner::default();
1074 test_clock_sleep_until(executor);
1075 }
1076
1077 #[test]
1078 fn test_deterministic_root_finishes() {
1079 let executor = deterministic::Runner::default();
1080 test_root_finishes(executor);
1081 }
1082
1083 #[test]
1084 fn test_deterministic_spawn_abort() {
1085 let executor = deterministic::Runner::default();
1086 test_spawn_abort(executor);
1087 }
1088
1089 #[test]
1090 fn test_deterministic_panic_aborts_root() {
1091 let runner = deterministic::Runner::default();
1092 test_panic_aborts_root(runner);
1093 }
1094
1095 #[test]
1096 #[should_panic(expected = "blah")]
1097 fn test_deterministic_panic_aborts_spawn() {
1098 let executor = deterministic::Runner::default();
1099 test_panic_aborts_spawn(executor);
1100 }
1101
1102 #[test]
1103 fn test_deterministic_select() {
1104 let executor = deterministic::Runner::default();
1105 test_select(executor);
1106 }
1107
1108 #[test]
1109 fn test_deterministic_select_loop() {
1110 let executor = deterministic::Runner::default();
1111 test_select_loop(executor);
1112 }
1113
1114 #[test]
1115 fn test_deterministic_storage_operations() {
1116 let executor = deterministic::Runner::default();
1117 test_storage_operations(executor);
1118 }
1119
1120 #[test]
1121 fn test_deterministic_blob_read_write() {
1122 let executor = deterministic::Runner::default();
1123 test_blob_read_write(executor);
1124 }
1125
1126 #[test]
1127 fn test_deterministic_many_partition_read_write() {
1128 let executor = deterministic::Runner::default();
1129 test_many_partition_read_write(executor);
1130 }
1131
1132 #[test]
1133 fn test_deterministic_blob_read_past_length() {
1134 let executor = deterministic::Runner::default();
1135 test_blob_read_past_length(executor);
1136 }
1137
1138 #[test]
1139 fn test_deterministic_blob_clone_and_concurrent_read() {
1140 let executor = deterministic::Runner::default();
1142 test_blob_clone_and_concurrent_read(executor);
1143 }
1144
1145 #[test]
1146 fn test_deterministic_shutdown() {
1147 let executor = deterministic::Runner::default();
1148 test_shutdown(executor);
1149 }
1150
1151 #[test]
1152 fn test_deterministic_spawn_ref() {
1153 let executor = deterministic::Runner::default();
1154 test_spawn_ref(executor);
1155 }
1156
1157 #[test]
1158 #[should_panic]
1159 fn test_deterministic_spawn_ref_duplicate() {
1160 let executor = deterministic::Runner::default();
1161 test_spawn_ref_duplicate(executor);
1162 }
1163
1164 #[test]
1165 #[should_panic]
1166 fn test_deterministic_spawn_duplicate() {
1167 let executor = deterministic::Runner::default();
1168 test_spawn_duplicate(executor);
1169 }
1170
1171 #[test]
1172 fn test_deterministic_spawn_blocking() {
1173 for dedicated in [false, true] {
1174 let executor = deterministic::Runner::default();
1175 test_spawn_blocking(executor, dedicated);
1176 }
1177 }
1178
1179 #[test]
1180 #[should_panic(expected = "blocking task panicked")]
1181 fn test_deterministic_spawn_blocking_panic() {
1182 for dedicated in [false, true] {
1183 let executor = deterministic::Runner::default();
1184 executor.start(|context| async move {
1185 let handle = context.spawn_blocking(dedicated, |_| {
1186 panic!("blocking task panicked");
1187 });
1188 handle.await.unwrap();
1189 });
1190 }
1191 }
1192
1193 #[test]
1194 fn test_deterministic_spawn_blocking_abort() {
1195 for dedicated in [false, true] {
1196 let executor = deterministic::Runner::default();
1197 test_spawn_blocking_abort(executor, dedicated);
1198 }
1199 }
1200
1201 #[test]
1202 fn test_deterministic_spawn_blocking_ref() {
1203 for dedicated in [false, true] {
1204 let executor = deterministic::Runner::default();
1205 test_spawn_blocking_ref(executor, dedicated);
1206 }
1207 }
1208
1209 #[test]
1210 #[should_panic]
1211 fn test_deterministic_spawn_blocking_ref_duplicate() {
1212 for dedicated in [false, true] {
1213 let executor = deterministic::Runner::default();
1214 test_spawn_blocking_ref_duplicate(executor, dedicated);
1215 }
1216 }
1217
1218 #[test]
1219 fn test_deterministic_metrics() {
1220 let executor = deterministic::Runner::default();
1221 test_metrics(executor);
1222 }
1223
1224 #[test]
1225 #[should_panic]
1226 fn test_deterministic_metrics_label() {
1227 let executor = deterministic::Runner::default();
1228 test_metrics_label(executor);
1229 }
1230
1231 #[test]
1232 fn test_tokio_error_future() {
1233 let runner = tokio::Runner::default();
1234 test_error_future(runner);
1235 }
1236
1237 #[test]
1238 fn test_tokio_clock_sleep() {
1239 let executor = tokio::Runner::default();
1240 test_clock_sleep(executor);
1241 }
1242
1243 #[test]
1244 fn test_tokio_clock_sleep_until() {
1245 let executor = tokio::Runner::default();
1246 test_clock_sleep_until(executor);
1247 }
1248
1249 #[test]
1250 fn test_tokio_root_finishes() {
1251 let executor = tokio::Runner::default();
1252 test_root_finishes(executor);
1253 }
1254
1255 #[test]
1256 fn test_tokio_spawn_abort() {
1257 let executor = tokio::Runner::default();
1258 test_spawn_abort(executor);
1259 }
1260
1261 #[test]
1262 fn test_tokio_panic_aborts_root() {
1263 let executor = tokio::Runner::default();
1264 test_panic_aborts_root(executor);
1265 }
1266
1267 #[test]
1268 fn test_tokio_panic_aborts_spawn() {
1269 let executor = tokio::Runner::default();
1270 test_panic_aborts_spawn(executor);
1271 }
1272
1273 #[test]
1274 fn test_tokio_select() {
1275 let executor = tokio::Runner::default();
1276 test_select(executor);
1277 }
1278
1279 #[test]
1280 fn test_tokio_select_loop() {
1281 let executor = tokio::Runner::default();
1282 test_select_loop(executor);
1283 }
1284
1285 #[test]
1286 fn test_tokio_storage_operations() {
1287 let executor = tokio::Runner::default();
1288 test_storage_operations(executor);
1289 }
1290
1291 #[test]
1292 fn test_tokio_blob_read_write() {
1293 let executor = tokio::Runner::default();
1294 test_blob_read_write(executor);
1295 }
1296
1297 #[test]
1298 fn test_tokio_many_partition_read_write() {
1299 let executor = tokio::Runner::default();
1300 test_many_partition_read_write(executor);
1301 }
1302
1303 #[test]
1304 fn test_tokio_blob_read_past_length() {
1305 let executor = tokio::Runner::default();
1306 test_blob_read_past_length(executor);
1307 }
1308
1309 #[test]
1310 fn test_tokio_blob_clone_and_concurrent_read() {
1311 let executor = tokio::Runner::default();
1313 test_blob_clone_and_concurrent_read(executor);
1314 }
1315
1316 #[test]
1317 fn test_tokio_shutdown() {
1318 let executor = tokio::Runner::default();
1319 test_shutdown(executor);
1320 }
1321
1322 #[test]
1323 fn test_tokio_spawn_ref() {
1324 let executor = tokio::Runner::default();
1325 test_spawn_ref(executor);
1326 }
1327
1328 #[test]
1329 #[should_panic]
1330 fn test_tokio_spawn_ref_duplicate() {
1331 let executor = tokio::Runner::default();
1332 test_spawn_ref_duplicate(executor);
1333 }
1334
1335 #[test]
1336 #[should_panic]
1337 fn test_tokio_spawn_duplicate() {
1338 let executor = tokio::Runner::default();
1339 test_spawn_duplicate(executor);
1340 }
1341
1342 #[test]
1343 fn test_tokio_spawn_blocking() {
1344 for dedicated in [false, true] {
1345 let executor = tokio::Runner::default();
1346 test_spawn_blocking(executor, dedicated);
1347 }
1348 }
1349
1350 #[test]
1351 fn test_tokio_spawn_blocking_panic() {
1352 for dedicated in [false, true] {
1353 let executor = tokio::Runner::default();
1354 executor.start(|context| async move {
1355 let handle = context.spawn_blocking(dedicated, |_| {
1356 panic!("blocking task panicked");
1357 });
1358 let result = handle.await;
1359 assert!(matches!(result, Err(Error::Exited)));
1360 });
1361 }
1362 }
1363
1364 #[test]
1365 fn test_tokio_spawn_blocking_abort() {
1366 for dedicated in [false, true] {
1367 let executor = tokio::Runner::default();
1368 test_spawn_blocking_abort(executor, dedicated);
1369 }
1370 }
1371
1372 #[test]
1373 fn test_tokio_spawn_blocking_ref() {
1374 for dedicated in [false, true] {
1375 let executor = tokio::Runner::default();
1376 test_spawn_blocking_ref(executor, dedicated);
1377 }
1378 }
1379
1380 #[test]
1381 #[should_panic]
1382 fn test_tokio_spawn_blocking_ref_duplicate() {
1383 for dedicated in [false, true] {
1384 let executor = tokio::Runner::default();
1385 test_spawn_blocking_ref_duplicate(executor, dedicated);
1386 }
1387 }
1388
1389 #[test]
1390 fn test_tokio_metrics() {
1391 let executor = tokio::Runner::default();
1392 test_metrics(executor);
1393 }
1394
1395 #[test]
1396 #[should_panic]
1397 fn test_tokio_metrics_label() {
1398 let executor = tokio::Runner::default();
1399 test_metrics_label(executor);
1400 }
1401
1402 #[test]
1403 fn test_tokio_telemetry() {
1404 let executor = tokio::Runner::default();
1405 executor.start(|context| async move {
1406 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1408
1409 tokio::telemetry::init(
1411 context.with_label("metrics"),
1412 tokio::telemetry::Logging {
1413 level: Level::INFO,
1414 json: false,
1415 },
1416 Some(address),
1417 None,
1418 );
1419
1420 let counter: Counter<u64> = Counter::default();
1422 context.register("test_counter", "Test counter", counter.clone());
1423 counter.inc();
1424
1425 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1427 let mut line = Vec::new();
1428 loop {
1429 let byte = stream.recv(vec![0; 1]).await?;
1430 if byte[0] == b'\n' {
1431 if line.last() == Some(&b'\r') {
1432 line.pop(); }
1434 break;
1435 }
1436 line.push(byte[0]);
1437 }
1438 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1439 }
1440
1441 async fn read_headers<St: Stream>(
1442 stream: &mut St,
1443 ) -> Result<HashMap<String, String>, Error> {
1444 let mut headers = HashMap::new();
1445 loop {
1446 let line = read_line(stream).await?;
1447 if line.is_empty() {
1448 break;
1449 }
1450 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1451 if parts.len() == 2 {
1452 headers.insert(parts[0].to_string(), parts[1].to_string());
1453 }
1454 }
1455 Ok(headers)
1456 }
1457
1458 async fn read_body<St: Stream>(
1459 stream: &mut St,
1460 content_length: usize,
1461 ) -> Result<String, Error> {
1462 let read = stream.recv(vec![0; content_length]).await?;
1463 String::from_utf8(read).map_err(|_| Error::ReadFailed)
1464 }
1465
1466 let client_handle = context
1468 .with_label("client")
1469 .spawn(move |context| async move {
1470 let (mut sink, mut stream) = loop {
1471 match context.dial(address).await {
1472 Ok((sink, stream)) => break (sink, stream),
1473 Err(e) => {
1474 error!(err =?e, "failed to connect");
1476 context.sleep(Duration::from_millis(10)).await;
1477 }
1478 }
1479 };
1480
1481 let request = format!(
1483 "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1484 address
1485 );
1486 sink.send(Bytes::from(request)).await.unwrap();
1487
1488 let status_line = read_line(&mut stream).await.unwrap();
1490 assert_eq!(status_line, "HTTP/1.1 200 OK");
1491
1492 let headers = read_headers(&mut stream).await.unwrap();
1494 println!("Headers: {:?}", headers);
1495 let content_length = headers
1496 .get("content-length")
1497 .unwrap()
1498 .parse::<usize>()
1499 .unwrap();
1500
1501 let body = read_body(&mut stream, content_length).await.unwrap();
1503 assert!(body.contains("test_counter_total 1"));
1504 });
1505
1506 client_handle.await.unwrap();
1508 });
1509 }
1510}