1use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::io::Error as IoError;
23use std::{
24 future::Future,
25 net::SocketAddr,
26 time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36 if #[cfg(not(target_arch = "wasm32"))] {
37 pub mod tokio;
38 pub mod benchmarks;
39 }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49const METRICS_PREFIX: &str = "runtime";
51
52#[derive(Error, Debug)]
54pub enum Error {
55 #[error("exited")]
56 Exited,
57 #[error("closed")]
58 Closed,
59 #[error("timeout")]
60 Timeout,
61 #[error("bind failed")]
62 BindFailed,
63 #[error("connection failed")]
64 ConnectionFailed,
65 #[error("write failed")]
66 WriteFailed,
67 #[error("read failed")]
68 ReadFailed,
69 #[error("send failed")]
70 SendFailed,
71 #[error("recv failed")]
72 RecvFailed,
73 #[error("partition creation failed: {0}")]
74 PartitionCreationFailed(String),
75 #[error("partition missing: {0}")]
76 PartitionMissing(String),
77 #[error("partition corrupt: {0}")]
78 PartitionCorrupt(String),
79 #[error("blob open failed: {0}/{1} error: {2}")]
80 BlobOpenFailed(String, String, IoError),
81 #[error("blob missing: {0}/{1}")]
82 BlobMissing(String, String),
83 #[error("blob truncate failed: {0}/{1} error: {2}")]
84 BlobTruncateFailed(String, String, IoError),
85 #[error("blob sync failed: {0}/{1} error: {2}")]
86 BlobSyncFailed(String, String, IoError),
87 #[error("blob close failed: {0}/{1} error: {2}")]
88 BlobCloseFailed(String, String, IoError),
89 #[error("blob insufficient length")]
90 BlobInsufficientLength,
91 #[error("offset overflow")]
92 OffsetOverflow,
93}
94
95pub trait Runner {
98 type Context;
104
105 fn start<F, Fut>(self, f: F) -> Fut::Output
107 where
108 F: FnOnce(Self::Context) -> Fut,
109 Fut: Future;
110}
111
112pub trait Spawner: Clone + Send + Sync + 'static {
114 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122 where
123 F: FnOnce(Self) -> Fut + Send + 'static,
124 Fut: Future<Output = T> + Send + 'static,
125 T: Send + 'static;
126
127 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136 where
137 F: Future<Output = T> + Send + 'static,
138 T: Send + 'static;
139
140 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159 where
160 F: FnOnce(Self) -> T + Send + 'static,
161 T: Send + 'static;
162
163 fn spawn_blocking_ref<F, T>(
172 &mut self,
173 dedicated: bool,
174 ) -> impl FnOnce(F) -> Handle<T> + 'static
175 where
176 F: FnOnce() -> T + Send + 'static,
177 T: Send + 'static;
178
179 fn stop(&self, value: i32);
186
187 fn stopped(&self) -> Signal;
192}
193
194pub trait Metrics: Clone + Send + Sync + 'static {
196 fn label(&self) -> String;
198
199 fn with_label(&self, label: &str) -> Self;
207
208 fn scoped_label(&self, label: &str) -> String {
212 let label = if self.label().is_empty() {
213 label.to_string()
214 } else {
215 format!("{}_{}", self.label(), label)
216 };
217 assert!(
218 !label.starts_with(METRICS_PREFIX),
219 "using runtime label is not allowed"
220 );
221 label
222 }
223
224 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
228
229 fn encode(&self) -> String;
231}
232
233pub trait Clock: Clone + Send + Sync + 'static {
239 fn current(&self) -> SystemTime;
241
242 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
244
245 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
247}
248
249pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
251
252pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
254
255pub type ListenerOf<N> = <N as crate::Network>::Listener;
257
258pub trait Network: Clone + Send + Sync + 'static {
261 type Listener: Listener;
265
266 fn bind(
268 &self,
269 socket: SocketAddr,
270 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
271
272 fn dial(
274 &self,
275 socket: SocketAddr,
276 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
277}
278
279pub trait Listener: Sync + Send + 'static {
282 type Sink: Sink;
285 type Stream: Stream;
288
289 fn accept(
291 &mut self,
292 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
293
294 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
296}
297
298pub trait Sink: Sync + Send + 'static {
301 fn send(
303 &mut self,
304 msg: impl Into<StableBuf> + Send,
305 ) -> impl Future<Output = Result<(), Error>> + Send;
306}
307
308pub trait Stream: Sync + Send + 'static {
311 fn recv(
314 &mut self,
315 buf: impl Into<StableBuf> + Send,
316 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
317}
318
319pub trait Storage: Clone + Send + Sync + 'static {
327 type Blob: Blob;
329
330 fn open(
336 &self,
337 partition: &str,
338 name: &[u8],
339 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
340
341 fn remove(
345 &self,
346 partition: &str,
347 name: Option<&[u8]>,
348 ) -> impl Future<Output = Result<(), Error>> + Send;
349
350 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
352}
353
354#[allow(clippy::len_without_is_empty)]
365pub trait Blob: Clone + Send + Sync + 'static {
366 fn read_at(
371 &self,
372 buf: impl Into<StableBuf> + Send,
373 offset: u64,
374 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
375
376 fn write_at(
378 &self,
379 buf: impl Into<StableBuf> + Send,
380 offset: u64,
381 ) -> impl Future<Output = Result<(), Error>> + Send;
382
383 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
385
386 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
388
389 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use bytes::Bytes;
397 use commonware_macros::select;
398 use futures::channel::oneshot;
399 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
400 use prometheus_client::metrics::counter::Counter;
401 use std::collections::HashMap;
402 use std::panic::{catch_unwind, AssertUnwindSafe};
403 use std::str::FromStr;
404 use std::sync::Mutex;
405 use tracing::{error, Level};
406 use utils::reschedule;
407
408 fn test_error_future<R: Runner>(runner: R) {
409 async fn error_future() -> Result<&'static str, &'static str> {
410 Err("An error occurred")
411 }
412 let result = runner.start(|_| error_future());
413 assert_eq!(result, Err("An error occurred"));
414 }
415
416 fn test_clock_sleep<R: Runner>(runner: R)
417 where
418 R::Context: Spawner + Clock,
419 {
420 runner.start(|context| async move {
421 let start = context.current();
423 let sleep_duration = Duration::from_millis(10);
424 context.sleep(sleep_duration).await;
425
426 let end = context.current();
428 assert!(end.duration_since(start).unwrap() >= sleep_duration);
429 });
430 }
431
432 fn test_clock_sleep_until<R: Runner>(runner: R)
433 where
434 R::Context: Spawner + Clock,
435 {
436 runner.start(|context| async move {
437 let now = context.current();
439 context.sleep_until(now + Duration::from_millis(100)).await;
440
441 let elapsed = now.elapsed().unwrap();
443 assert!(elapsed >= Duration::from_millis(100));
444 });
445 }
446
447 fn test_root_finishes<R: Runner>(runner: R)
448 where
449 R::Context: Spawner,
450 {
451 runner.start(|context| async move {
452 context.spawn(|_| async move {
453 loop {
454 reschedule().await;
455 }
456 });
457 });
458 }
459
460 fn test_spawn_abort<R: Runner>(runner: R)
461 where
462 R::Context: Spawner,
463 {
464 runner.start(|context| async move {
465 let handle = context.spawn(|_| async move {
466 loop {
467 reschedule().await;
468 }
469 });
470 handle.abort();
471 assert!(matches!(handle.await, Err(Error::Closed)));
472 });
473 }
474
475 fn test_panic_aborts_root<R: Runner>(runner: R) {
476 let result = catch_unwind(AssertUnwindSafe(|| {
477 runner.start(|_| async move {
478 panic!("blah");
479 });
480 }));
481 result.unwrap_err();
482 }
483
484 fn test_panic_aborts_spawn<R: Runner>(runner: R)
485 where
486 R::Context: Spawner,
487 {
488 let result = runner.start(|context| async move {
489 let result = context.spawn(|_| async move {
490 panic!("blah");
491 });
492 assert!(matches!(result.await, Err(Error::Exited)));
493 Result::<(), Error>::Ok(())
494 });
495
496 result.unwrap();
498 }
499
500 fn test_select<R: Runner>(runner: R) {
501 runner.start(|_| async move {
502 let output = Mutex::new(0);
504 select! {
505 v1 = ready(1) => {
506 *output.lock().unwrap() = v1;
507 },
508 v2 = ready(2) => {
509 *output.lock().unwrap() = v2;
510 },
511 };
512 assert_eq!(*output.lock().unwrap(), 1);
513
514 select! {
516 v1 = std::future::pending::<i32>() => {
517 *output.lock().unwrap() = v1;
518 },
519 v2 = ready(2) => {
520 *output.lock().unwrap() = v2;
521 },
522 };
523 assert_eq!(*output.lock().unwrap(), 2);
524 });
525 }
526
527 fn test_select_loop<R: Runner>(runner: R)
529 where
530 R::Context: Clock,
531 {
532 runner.start(|context| async move {
533 let (mut sender, mut receiver) = mpsc::unbounded();
535 for _ in 0..2 {
536 select! {
537 v = receiver.next() => {
538 panic!("unexpected value: {:?}", v);
539 },
540 _ = context.sleep(Duration::from_millis(100)) => {
541 continue;
542 },
543 };
544 }
545
546 sender.send(0).await.unwrap();
548 sender.send(1).await.unwrap();
549
550 select! {
552 _ = async {} => {
553 },
555 v = receiver.next() => {
556 panic!("unexpected value: {:?}", v);
557 },
558 };
559
560 for i in 0..2 {
562 select! {
563 _ = context.sleep(Duration::from_millis(100)) => {
564 panic!("timeout");
565 },
566 v = receiver.next() => {
567 assert_eq!(v.unwrap(), i);
568 },
569 };
570 }
571 });
572 }
573
574 fn test_storage_operations<R: Runner>(runner: R)
575 where
576 R::Context: Storage,
577 {
578 runner.start(|context| async move {
579 let partition = "test_partition";
580 let name = b"test_blob";
581
582 let (blob, _) = context
584 .open(partition, name)
585 .await
586 .expect("Failed to open blob");
587
588 let data = b"Hello, Storage!";
590 blob.write_at(Vec::from(data), 0)
591 .await
592 .expect("Failed to write to blob");
593
594 blob.sync().await.expect("Failed to sync blob");
596
597 let read = blob
599 .read_at(vec![0; data.len()], 0)
600 .await
601 .expect("Failed to read from blob");
602 assert_eq!(read.as_ref(), data);
603
604 blob.close().await.expect("Failed to close blob");
606
607 let blobs = context
609 .scan(partition)
610 .await
611 .expect("Failed to scan partition");
612 assert!(blobs.contains(&name.to_vec()));
613
614 let (blob, len) = context
616 .open(partition, name)
617 .await
618 .expect("Failed to reopen blob");
619 assert_eq!(len, data.len() as u64);
620
621 let read = blob
623 .read_at(vec![0u8; 7], 7)
624 .await
625 .expect("Failed to read data");
626 assert_eq!(read.as_ref(), b"Storage");
627
628 blob.close().await.expect("Failed to close blob");
630
631 context
633 .remove(partition, Some(name))
634 .await
635 .expect("Failed to remove blob");
636
637 let blobs = context
639 .scan(partition)
640 .await
641 .expect("Failed to scan partition");
642 assert!(!blobs.contains(&name.to_vec()));
643
644 context
646 .remove(partition, None)
647 .await
648 .expect("Failed to remove partition");
649
650 let result = context.scan(partition).await;
652 assert!(matches!(result, Err(Error::PartitionMissing(_))));
653 });
654 }
655
656 fn test_blob_read_write<R: Runner>(runner: R)
657 where
658 R::Context: Storage,
659 {
660 runner.start(|context| async move {
661 let partition = "test_partition";
662 let name = b"test_blob_rw";
663
664 let (blob, _) = context
666 .open(partition, name)
667 .await
668 .expect("Failed to open blob");
669
670 let data1 = b"Hello";
672 let data2 = b"World";
673 blob.write_at(Vec::from(data1), 0)
674 .await
675 .expect("Failed to write data1");
676 blob.write_at(Vec::from(data2), 5)
677 .await
678 .expect("Failed to write data2");
679
680 let read = blob
682 .read_at(vec![0u8; 10], 0)
683 .await
684 .expect("Failed to read data");
685 assert_eq!(&read.as_ref()[..5], data1);
686 assert_eq!(&read.as_ref()[5..], data2);
687
688 let data3 = b"Store";
690 blob.write_at(Vec::from(data3), 5)
691 .await
692 .expect("Failed to write data3");
693
694 blob.truncate(5).await.expect("Failed to truncate blob");
696 let read = blob
697 .read_at(vec![0; 5], 0)
698 .await
699 .expect("Failed to read data");
700 assert_eq!(&read.as_ref()[..5], data1);
701
702 let result = blob.read_at(vec![0u8; 10], 0).await;
704 assert!(result.is_err());
705
706 blob.close().await.expect("Failed to close blob");
708 });
709 }
710
711 fn test_many_partition_read_write<R: Runner>(runner: R)
712 where
713 R::Context: Storage,
714 {
715 runner.start(|context| async move {
716 let partitions = ["partition1", "partition2", "partition3"];
717 let name = b"test_blob_rw";
718 let data1 = b"Hello";
719 let data2 = b"World";
720
721 for (additional, partition) in partitions.iter().enumerate() {
722 let (blob, _) = context
724 .open(partition, name)
725 .await
726 .expect("Failed to open blob");
727
728 blob.write_at(Vec::from(data1), 0)
730 .await
731 .expect("Failed to write data1");
732 blob.write_at(Vec::from(data2), 5 + additional as u64)
733 .await
734 .expect("Failed to write data2");
735
736 blob.close().await.expect("Failed to close blob");
738 }
739
740 for (additional, partition) in partitions.iter().enumerate() {
741 let (blob, len) = context
743 .open(partition, name)
744 .await
745 .expect("Failed to open blob");
746 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
747
748 let read = blob
750 .read_at(vec![0u8; 10 + additional], 0)
751 .await
752 .expect("Failed to read data");
753 assert_eq!(&read.as_ref()[..5], b"Hello");
754 assert_eq!(&read.as_ref()[5 + additional..], b"World");
755
756 blob.close().await.expect("Failed to close blob");
758 }
759 });
760 }
761
762 fn test_blob_read_past_length<R: Runner>(runner: R)
763 where
764 R::Context: Storage,
765 {
766 runner.start(|context| async move {
767 let partition = "test_partition";
768 let name = b"test_blob_rw";
769
770 let (blob, _) = context
772 .open(partition, name)
773 .await
774 .expect("Failed to open blob");
775
776 let result = blob.read_at(vec![0u8; 10], 0).await;
778 assert!(result.is_err());
779
780 let data = b"Hello, Storage!".to_vec();
782 blob.write_at(data, 0)
783 .await
784 .expect("Failed to write to blob");
785
786 let result = blob.read_at(vec![0u8; 20], 0).await;
788 assert!(result.is_err());
789 })
790 }
791
792 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
793 where
794 R::Context: Spawner + Storage + Metrics,
795 {
796 runner.start(|context| async move {
797 let partition = "test_partition";
798 let name = b"test_blob_rw";
799
800 let (blob, _) = context
802 .open(partition, name)
803 .await
804 .expect("Failed to open blob");
805
806 let data = b"Hello, Storage!";
808 blob.write_at(Vec::from(data), 0)
809 .await
810 .expect("Failed to write to blob");
811
812 blob.sync().await.expect("Failed to sync blob");
814
815 let check1 = context.with_label("check1").spawn({
817 let blob = blob.clone();
818 move |_| async move {
819 let read = blob
820 .read_at(vec![0u8; data.len()], 0)
821 .await
822 .expect("Failed to read from blob");
823 assert_eq!(read.as_ref(), data);
824 }
825 });
826 let check2 = context.with_label("check2").spawn({
827 let blob = blob.clone();
828 move |_| async move {
829 let read = blob
830 .read_at(vec![0; data.len()], 0)
831 .await
832 .expect("Failed to read from blob");
833 assert_eq!(read.as_ref(), data);
834 }
835 });
836
837 let result = join!(check1, check2);
839 assert!(result.0.is_ok());
840 assert!(result.1.is_ok());
841
842 let read = blob
844 .read_at(vec![0; data.len()], 0)
845 .await
846 .expect("Failed to read from blob");
847 assert_eq!(read.as_ref(), data);
848
849 blob.close().await.expect("Failed to close blob");
851
852 let buffer = context.encode();
854 assert!(buffer.contains("open_blobs 0"));
855 });
856 }
857
858 fn test_shutdown<R: Runner>(runner: R)
859 where
860 R::Context: Spawner + Metrics + Clock,
861 {
862 let kill = 9;
863 runner.start(|context| async move {
864 let before = context
866 .with_label("before")
867 .spawn(move |context| async move {
868 let sig = context.stopped().await;
869 assert_eq!(sig.unwrap(), kill);
870 });
871
872 let after = context
874 .with_label("after")
875 .spawn(move |context| async move {
876 let mut signal = context.stopped();
878 loop {
879 select! {
880 sig = &mut signal => {
881 assert_eq!(sig.unwrap(), kill);
883 break;
884 },
885 _ = context.sleep(Duration::from_millis(10)) => {
886 },
888 }
889 }
890 });
891
892 context.sleep(Duration::from_millis(50)).await;
894
895 context.stop(kill);
897
898 let result = join!(before, after);
900 assert!(result.0.is_ok());
901 assert!(result.1.is_ok());
902 });
903 }
904
905 fn test_spawn_ref<R: Runner>(runner: R)
906 where
907 R::Context: Spawner,
908 {
909 runner.start(|mut context| async move {
910 let handle = context.spawn_ref();
911 let result = handle(async move { 42 }).await;
912 assert!(matches!(result, Ok(42)));
913 });
914 }
915
916 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
917 where
918 R::Context: Spawner,
919 {
920 runner.start(|mut context| async move {
921 let handle = context.spawn_ref();
922 let result = handle(async move { 42 }).await;
923 assert!(matches!(result, Ok(42)));
924
925 let handle = context.spawn_ref();
927 let result = handle(async move { 42 }).await;
928 assert!(matches!(result, Ok(42)));
929 });
930 }
931
932 fn test_spawn_duplicate<R: Runner>(runner: R)
933 where
934 R::Context: Spawner,
935 {
936 runner.start(|mut context| async move {
937 let handle = context.spawn_ref();
938 let result = handle(async move { 42 }).await;
939 assert!(matches!(result, Ok(42)));
940
941 context.spawn(|_| async move { 42 });
943 });
944 }
945
946 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
947 where
948 R::Context: Spawner,
949 {
950 runner.start(|context| async move {
951 let handle = context.spawn_blocking(dedicated, |_| 42);
952 let result = handle.await;
953 assert!(matches!(result, Ok(42)));
954 });
955 }
956
957 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
958 where
959 R::Context: Spawner,
960 {
961 runner.start(|mut context| async move {
962 let spawn = context.spawn_blocking_ref(dedicated);
963 let handle = spawn(|| 42);
964 let result = handle.await;
965 assert!(matches!(result, Ok(42)));
966 });
967 }
968
969 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
970 where
971 R::Context: Spawner,
972 {
973 runner.start(|mut context| async move {
974 let spawn = context.spawn_blocking_ref(dedicated);
975 let result = spawn(|| 42).await;
976 assert!(matches!(result, Ok(42)));
977
978 context.spawn_blocking(dedicated, |_| 42);
980 });
981 }
982
983 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
984 where
985 R::Context: Spawner,
986 {
987 runner.start(|context| async move {
988 let (sender, mut receiver) = oneshot::channel();
990 let handle = context.spawn_blocking(dedicated, move |_| {
991 loop {
993 if receiver.try_recv().is_ok() {
994 break;
995 }
996 }
997
998 let mut count = 0;
1000 loop {
1001 count += 1;
1002 if count >= 100_000_000 {
1003 break;
1004 }
1005 }
1006 count
1007 });
1008
1009 handle.abort();
1015 sender.send(()).unwrap();
1016
1017 assert!(matches!(handle.await, Ok(100_000_000)));
1019 });
1020 }
1021
1022 fn test_metrics<R: Runner>(runner: R)
1023 where
1024 R::Context: Metrics,
1025 {
1026 runner.start(|context| async move {
1027 assert_eq!(context.label(), "");
1029
1030 let counter = Counter::<u64>::default();
1032 context.register("test", "test", counter.clone());
1033
1034 counter.inc();
1036
1037 let buffer = context.encode();
1039 assert!(buffer.contains("test_total 1"));
1040
1041 let context = context.with_label("nested");
1043 let nested_counter = Counter::<u64>::default();
1044 context.register("test", "test", nested_counter.clone());
1045
1046 nested_counter.inc();
1048
1049 let buffer = context.encode();
1051 assert!(buffer.contains("nested_test_total 1"));
1052 assert!(buffer.contains("test_total 1"));
1053 });
1054 }
1055
1056 fn test_metrics_label<R: Runner>(runner: R)
1057 where
1058 R::Context: Metrics,
1059 {
1060 runner.start(|context| async move {
1061 context.with_label(METRICS_PREFIX);
1062 })
1063 }
1064
1065 #[test]
1066 fn test_deterministic_future() {
1067 let runner = deterministic::Runner::default();
1068 test_error_future(runner);
1069 }
1070
1071 #[test]
1072 fn test_deterministic_clock_sleep() {
1073 let executor = deterministic::Runner::default();
1074 test_clock_sleep(executor);
1075 }
1076
1077 #[test]
1078 fn test_deterministic_clock_sleep_until() {
1079 let executor = deterministic::Runner::default();
1080 test_clock_sleep_until(executor);
1081 }
1082
1083 #[test]
1084 fn test_deterministic_root_finishes() {
1085 let executor = deterministic::Runner::default();
1086 test_root_finishes(executor);
1087 }
1088
1089 #[test]
1090 fn test_deterministic_spawn_abort() {
1091 let executor = deterministic::Runner::default();
1092 test_spawn_abort(executor);
1093 }
1094
1095 #[test]
1096 fn test_deterministic_panic_aborts_root() {
1097 let runner = deterministic::Runner::default();
1098 test_panic_aborts_root(runner);
1099 }
1100
1101 #[test]
1102 #[should_panic(expected = "blah")]
1103 fn test_deterministic_panic_aborts_spawn() {
1104 let executor = deterministic::Runner::default();
1105 test_panic_aborts_spawn(executor);
1106 }
1107
1108 #[test]
1109 fn test_deterministic_select() {
1110 let executor = deterministic::Runner::default();
1111 test_select(executor);
1112 }
1113
1114 #[test]
1115 fn test_deterministic_select_loop() {
1116 let executor = deterministic::Runner::default();
1117 test_select_loop(executor);
1118 }
1119
1120 #[test]
1121 fn test_deterministic_storage_operations() {
1122 let executor = deterministic::Runner::default();
1123 test_storage_operations(executor);
1124 }
1125
1126 #[test]
1127 fn test_deterministic_blob_read_write() {
1128 let executor = deterministic::Runner::default();
1129 test_blob_read_write(executor);
1130 }
1131
1132 #[test]
1133 fn test_deterministic_many_partition_read_write() {
1134 let executor = deterministic::Runner::default();
1135 test_many_partition_read_write(executor);
1136 }
1137
1138 #[test]
1139 fn test_deterministic_blob_read_past_length() {
1140 let executor = deterministic::Runner::default();
1141 test_blob_read_past_length(executor);
1142 }
1143
1144 #[test]
1145 fn test_deterministic_blob_clone_and_concurrent_read() {
1146 let executor = deterministic::Runner::default();
1148 test_blob_clone_and_concurrent_read(executor);
1149 }
1150
1151 #[test]
1152 fn test_deterministic_shutdown() {
1153 let executor = deterministic::Runner::default();
1154 test_shutdown(executor);
1155 }
1156
1157 #[test]
1158 fn test_deterministic_spawn_ref() {
1159 let executor = deterministic::Runner::default();
1160 test_spawn_ref(executor);
1161 }
1162
1163 #[test]
1164 #[should_panic]
1165 fn test_deterministic_spawn_ref_duplicate() {
1166 let executor = deterministic::Runner::default();
1167 test_spawn_ref_duplicate(executor);
1168 }
1169
1170 #[test]
1171 #[should_panic]
1172 fn test_deterministic_spawn_duplicate() {
1173 let executor = deterministic::Runner::default();
1174 test_spawn_duplicate(executor);
1175 }
1176
1177 #[test]
1178 fn test_deterministic_spawn_blocking() {
1179 for dedicated in [false, true] {
1180 let executor = deterministic::Runner::default();
1181 test_spawn_blocking(executor, dedicated);
1182 }
1183 }
1184
1185 #[test]
1186 #[should_panic(expected = "blocking task panicked")]
1187 fn test_deterministic_spawn_blocking_panic() {
1188 for dedicated in [false, true] {
1189 let executor = deterministic::Runner::default();
1190 executor.start(|context| async move {
1191 let handle = context.spawn_blocking(dedicated, |_| {
1192 panic!("blocking task panicked");
1193 });
1194 handle.await.unwrap();
1195 });
1196 }
1197 }
1198
1199 #[test]
1200 fn test_deterministic_spawn_blocking_abort() {
1201 for dedicated in [false, true] {
1202 let executor = deterministic::Runner::default();
1203 test_spawn_blocking_abort(executor, dedicated);
1204 }
1205 }
1206
1207 #[test]
1208 fn test_deterministic_spawn_blocking_ref() {
1209 for dedicated in [false, true] {
1210 let executor = deterministic::Runner::default();
1211 test_spawn_blocking_ref(executor, dedicated);
1212 }
1213 }
1214
1215 #[test]
1216 #[should_panic]
1217 fn test_deterministic_spawn_blocking_ref_duplicate() {
1218 for dedicated in [false, true] {
1219 let executor = deterministic::Runner::default();
1220 test_spawn_blocking_ref_duplicate(executor, dedicated);
1221 }
1222 }
1223
1224 #[test]
1225 fn test_deterministic_metrics() {
1226 let executor = deterministic::Runner::default();
1227 test_metrics(executor);
1228 }
1229
1230 #[test]
1231 #[should_panic]
1232 fn test_deterministic_metrics_label() {
1233 let executor = deterministic::Runner::default();
1234 test_metrics_label(executor);
1235 }
1236
1237 #[test]
1238 fn test_tokio_error_future() {
1239 let runner = tokio::Runner::default();
1240 test_error_future(runner);
1241 }
1242
1243 #[test]
1244 fn test_tokio_clock_sleep() {
1245 let executor = tokio::Runner::default();
1246 test_clock_sleep(executor);
1247 }
1248
1249 #[test]
1250 fn test_tokio_clock_sleep_until() {
1251 let executor = tokio::Runner::default();
1252 test_clock_sleep_until(executor);
1253 }
1254
1255 #[test]
1256 fn test_tokio_root_finishes() {
1257 let executor = tokio::Runner::default();
1258 test_root_finishes(executor);
1259 }
1260
1261 #[test]
1262 fn test_tokio_spawn_abort() {
1263 let executor = tokio::Runner::default();
1264 test_spawn_abort(executor);
1265 }
1266
1267 #[test]
1268 fn test_tokio_panic_aborts_root() {
1269 let executor = tokio::Runner::default();
1270 test_panic_aborts_root(executor);
1271 }
1272
1273 #[test]
1274 fn test_tokio_panic_aborts_spawn() {
1275 let executor = tokio::Runner::default();
1276 test_panic_aborts_spawn(executor);
1277 }
1278
1279 #[test]
1280 fn test_tokio_select() {
1281 let executor = tokio::Runner::default();
1282 test_select(executor);
1283 }
1284
1285 #[test]
1286 fn test_tokio_select_loop() {
1287 let executor = tokio::Runner::default();
1288 test_select_loop(executor);
1289 }
1290
1291 #[test]
1292 fn test_tokio_storage_operations() {
1293 let executor = tokio::Runner::default();
1294 test_storage_operations(executor);
1295 }
1296
1297 #[test]
1298 fn test_tokio_blob_read_write() {
1299 let executor = tokio::Runner::default();
1300 test_blob_read_write(executor);
1301 }
1302
1303 #[test]
1304 fn test_tokio_many_partition_read_write() {
1305 let executor = tokio::Runner::default();
1306 test_many_partition_read_write(executor);
1307 }
1308
1309 #[test]
1310 fn test_tokio_blob_read_past_length() {
1311 let executor = tokio::Runner::default();
1312 test_blob_read_past_length(executor);
1313 }
1314
1315 #[test]
1316 fn test_tokio_blob_clone_and_concurrent_read() {
1317 let executor = tokio::Runner::default();
1319 test_blob_clone_and_concurrent_read(executor);
1320 }
1321
1322 #[test]
1323 fn test_tokio_shutdown() {
1324 let executor = tokio::Runner::default();
1325 test_shutdown(executor);
1326 }
1327
1328 #[test]
1329 fn test_tokio_spawn_ref() {
1330 let executor = tokio::Runner::default();
1331 test_spawn_ref(executor);
1332 }
1333
1334 #[test]
1335 #[should_panic]
1336 fn test_tokio_spawn_ref_duplicate() {
1337 let executor = tokio::Runner::default();
1338 test_spawn_ref_duplicate(executor);
1339 }
1340
1341 #[test]
1342 #[should_panic]
1343 fn test_tokio_spawn_duplicate() {
1344 let executor = tokio::Runner::default();
1345 test_spawn_duplicate(executor);
1346 }
1347
1348 #[test]
1349 fn test_tokio_spawn_blocking() {
1350 for dedicated in [false, true] {
1351 let executor = tokio::Runner::default();
1352 test_spawn_blocking(executor, dedicated);
1353 }
1354 }
1355
1356 #[test]
1357 fn test_tokio_spawn_blocking_panic() {
1358 for dedicated in [false, true] {
1359 let executor = tokio::Runner::default();
1360 executor.start(|context| async move {
1361 let handle = context.spawn_blocking(dedicated, |_| {
1362 panic!("blocking task panicked");
1363 });
1364 let result = handle.await;
1365 assert!(matches!(result, Err(Error::Exited)));
1366 });
1367 }
1368 }
1369
1370 #[test]
1371 fn test_tokio_spawn_blocking_abort() {
1372 for dedicated in [false, true] {
1373 let executor = tokio::Runner::default();
1374 test_spawn_blocking_abort(executor, dedicated);
1375 }
1376 }
1377
1378 #[test]
1379 fn test_tokio_spawn_blocking_ref() {
1380 for dedicated in [false, true] {
1381 let executor = tokio::Runner::default();
1382 test_spawn_blocking_ref(executor, dedicated);
1383 }
1384 }
1385
1386 #[test]
1387 #[should_panic]
1388 fn test_tokio_spawn_blocking_ref_duplicate() {
1389 for dedicated in [false, true] {
1390 let executor = tokio::Runner::default();
1391 test_spawn_blocking_ref_duplicate(executor, dedicated);
1392 }
1393 }
1394
1395 #[test]
1396 fn test_tokio_metrics() {
1397 let executor = tokio::Runner::default();
1398 test_metrics(executor);
1399 }
1400
1401 #[test]
1402 #[should_panic]
1403 fn test_tokio_metrics_label() {
1404 let executor = tokio::Runner::default();
1405 test_metrics_label(executor);
1406 }
1407
1408 #[test]
1409 fn test_tokio_telemetry() {
1410 let executor = tokio::Runner::default();
1411 executor.start(|context| async move {
1412 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1414
1415 tokio::telemetry::init(
1417 context.with_label("metrics"),
1418 tokio::telemetry::Logging {
1419 level: Level::INFO,
1420 json: false,
1421 },
1422 Some(address),
1423 None,
1424 );
1425
1426 let counter: Counter<u64> = Counter::default();
1428 context.register("test_counter", "Test counter", counter.clone());
1429 counter.inc();
1430
1431 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1433 let mut line = Vec::new();
1434 loop {
1435 let byte = stream.recv(vec![0; 1]).await?;
1436 if byte[0] == b'\n' {
1437 if line.last() == Some(&b'\r') {
1438 line.pop(); }
1440 break;
1441 }
1442 line.push(byte[0]);
1443 }
1444 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1445 }
1446
1447 async fn read_headers<St: Stream>(
1448 stream: &mut St,
1449 ) -> Result<HashMap<String, String>, Error> {
1450 let mut headers = HashMap::new();
1451 loop {
1452 let line = read_line(stream).await?;
1453 if line.is_empty() {
1454 break;
1455 }
1456 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1457 if parts.len() == 2 {
1458 headers.insert(parts[0].to_string(), parts[1].to_string());
1459 }
1460 }
1461 Ok(headers)
1462 }
1463
1464 async fn read_body<St: Stream>(
1465 stream: &mut St,
1466 content_length: usize,
1467 ) -> Result<String, Error> {
1468 let read = stream.recv(vec![0; content_length]).await?;
1469 String::from_utf8(read.as_ref().to_vec()).map_err(|_| Error::ReadFailed)
1470 }
1471
1472 let client_handle = context
1474 .with_label("client")
1475 .spawn(move |context| async move {
1476 let (mut sink, mut stream) = loop {
1477 match context.dial(address).await {
1478 Ok((sink, stream)) => break (sink, stream),
1479 Err(e) => {
1480 error!(err =?e, "failed to connect");
1482 context.sleep(Duration::from_millis(10)).await;
1483 }
1484 }
1485 };
1486
1487 let request = format!(
1489 "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1490 address
1491 );
1492 sink.send(Bytes::from(request).to_vec()).await.unwrap();
1493
1494 let status_line = read_line(&mut stream).await.unwrap();
1496 assert_eq!(status_line, "HTTP/1.1 200 OK");
1497
1498 let headers = read_headers(&mut stream).await.unwrap();
1500 println!("Headers: {:?}", headers);
1501 let content_length = headers
1502 .get("content-length")
1503 .unwrap()
1504 .parse::<usize>()
1505 .unwrap();
1506
1507 let body = read_body(&mut stream, content_length).await.unwrap();
1509 assert!(body.contains("test_counter_total 1"));
1510 });
1511
1512 client_handle.await.unwrap();
1514 });
1515 }
1516}