1use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23 future::Future,
24 net::SocketAddr,
25 time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32 if #[cfg(not(target_arch = "wasm32"))] {
33 pub mod tokio;
34 pub mod benchmarks;
35 }
36}
37mod network;
38mod storage;
39pub mod telemetry;
40mod utils;
41pub use utils::{
42 create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
43};
44
45const METRICS_PREFIX: &str = "runtime";
47
48#[derive(Error, Debug)]
50pub enum Error {
51 #[error("exited")]
52 Exited,
53 #[error("closed")]
54 Closed,
55 #[error("timeout")]
56 Timeout,
57 #[error("bind failed")]
58 BindFailed,
59 #[error("connection failed")]
60 ConnectionFailed,
61 #[error("write failed")]
62 WriteFailed,
63 #[error("read failed")]
64 ReadFailed,
65 #[error("send failed")]
66 SendFailed,
67 #[error("recv failed")]
68 RecvFailed,
69 #[error("partition creation failed: {0}")]
70 PartitionCreationFailed(String),
71 #[error("partition missing: {0}")]
72 PartitionMissing(String),
73 #[error("partition corrupt: {0}")]
74 PartitionCorrupt(String),
75 #[error("blob open failed: {0}/{1} error: {2}")]
76 BlobOpenFailed(String, String, IoError),
77 #[error("blob missing: {0}/{1}")]
78 BlobMissing(String, String),
79 #[error("blob truncate failed: {0}/{1} error: {2}")]
80 BlobTruncateFailed(String, String, IoError),
81 #[error("blob sync failed: {0}/{1} error: {2}")]
82 BlobSyncFailed(String, String, IoError),
83 #[error("blob close failed: {0}/{1} error: {2}")]
84 BlobCloseFailed(String, String, IoError),
85 #[error("blob insufficient length")]
86 BlobInsufficientLength,
87 #[error("offset overflow")]
88 OffsetOverflow,
89}
90
91pub trait Runner {
94 type Context;
100
101 fn start<F, Fut>(self, f: F) -> Fut::Output
103 where
104 F: FnOnce(Self::Context) -> Fut,
105 Fut: Future;
106}
107
108pub trait Spawner: Clone + Send + Sync + 'static {
110 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
118 where
119 F: FnOnce(Self) -> Fut + Send + 'static,
120 Fut: Future<Output = T> + Send + 'static,
121 T: Send + 'static;
122
123 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136 where
137 F: Future<Output = T> + Send + 'static,
138 T: Send + 'static;
139
140 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
150 where
151 F: FnOnce() -> T + Send + 'static,
152 T: Send + 'static;
153
154 fn stop(&self, value: i32);
161
162 fn stopped(&self) -> Signal;
167}
168
169pub trait Metrics: Clone + Send + Sync + 'static {
171 fn label(&self) -> String;
173
174 fn with_label(&self, label: &str) -> Self;
182
183 fn scoped_label(&self, label: &str) -> String {
187 let label = if self.label().is_empty() {
188 label.to_string()
189 } else {
190 format!("{}_{}", self.label(), label)
191 };
192 assert!(
193 !label.starts_with(METRICS_PREFIX),
194 "using runtime label is not allowed"
195 );
196 label
197 }
198
199 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
203
204 fn encode(&self) -> String;
206}
207
208pub trait Clock: Clone + Send + Sync + 'static {
214 fn current(&self) -> SystemTime;
216
217 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
219
220 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
222}
223
224pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
226
227pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
229
230pub type ListenerOf<N> = <N as crate::Network>::Listener;
232
233pub trait Network: Clone + Send + Sync + 'static {
236 type Listener: Listener;
240
241 fn bind(
243 &self,
244 socket: SocketAddr,
245 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
246
247 fn dial(
249 &self,
250 socket: SocketAddr,
251 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
252}
253
254pub trait Listener: Sync + Send + 'static {
257 type Sink: Sink;
260 type Stream: Stream;
263
264 fn accept(
266 &mut self,
267 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
268
269 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
271}
272
273pub trait Sink: Sync + Send + 'static {
276 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
278}
279
280pub trait Stream: Sync + Send + 'static {
283 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
286}
287
288pub trait Storage: Clone + Send + Sync + 'static {
296 type Blob: Blob;
298
299 fn open(
305 &self,
306 partition: &str,
307 name: &[u8],
308 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
309
310 fn remove(
314 &self,
315 partition: &str,
316 name: Option<&[u8]>,
317 ) -> impl Future<Output = Result<(), Error>> + Send;
318
319 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
321}
322
323#[allow(clippy::len_without_is_empty)]
334pub trait Blob: Clone + Send + Sync + 'static {
335 fn read_at(
340 &self,
341 buf: &mut [u8],
342 offset: u64,
343 ) -> impl Future<Output = Result<(), Error>> + Send;
344
345 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
347
348 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
350
351 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
353
354 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use commonware_macros::select;
362 use futures::channel::oneshot;
363 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
364 use prometheus_client::metrics::counter::Counter;
365 use std::collections::HashMap;
366 use std::panic::{catch_unwind, AssertUnwindSafe};
367 use std::str::FromStr;
368 use std::sync::Mutex;
369 use tracing::{error, Level};
370 use utils::reschedule;
371
372 fn test_error_future<R: Runner>(runner: R) {
373 async fn error_future() -> Result<&'static str, &'static str> {
374 Err("An error occurred")
375 }
376 let result = runner.start(|_| error_future());
377 assert_eq!(result, Err("An error occurred"));
378 }
379
380 fn test_clock_sleep<R: Runner>(runner: R)
381 where
382 R::Context: Spawner + Clock,
383 {
384 runner.start(|context| async move {
385 let start = context.current();
387 let sleep_duration = Duration::from_millis(10);
388 context.sleep(sleep_duration).await;
389
390 let end = context.current();
392 assert!(end.duration_since(start).unwrap() >= sleep_duration);
393 });
394 }
395
396 fn test_clock_sleep_until<R: Runner>(runner: R)
397 where
398 R::Context: Spawner + Clock,
399 {
400 runner.start(|context| async move {
401 let now = context.current();
403 context.sleep_until(now + Duration::from_millis(100)).await;
404
405 let elapsed = now.elapsed().unwrap();
407 assert!(elapsed >= Duration::from_millis(100));
408 });
409 }
410
411 fn test_root_finishes<R: Runner>(runner: R)
412 where
413 R::Context: Spawner,
414 {
415 runner.start(|context| async move {
416 context.spawn(|_| async move {
417 loop {
418 reschedule().await;
419 }
420 });
421 });
422 }
423
424 fn test_spawn_abort<R: Runner>(runner: R)
425 where
426 R::Context: Spawner,
427 {
428 runner.start(|context| async move {
429 let handle = context.spawn(|_| async move {
430 loop {
431 reschedule().await;
432 }
433 });
434 handle.abort();
435 assert!(matches!(handle.await, Err(Error::Closed)));
436 });
437 }
438
439 fn test_panic_aborts_root<R: Runner>(runner: R) {
440 let result = catch_unwind(AssertUnwindSafe(|| {
441 runner.start(|_| async move {
442 panic!("blah");
443 });
444 }));
445 result.unwrap_err();
446 }
447
448 fn test_panic_aborts_spawn<R: Runner>(runner: R)
449 where
450 R::Context: Spawner,
451 {
452 let result = runner.start(|context| async move {
453 let result = context.spawn(|_| async move {
454 panic!("blah");
455 });
456 assert!(matches!(result.await, Err(Error::Exited)));
457 Result::<(), Error>::Ok(())
458 });
459
460 result.unwrap();
462 }
463
464 fn test_select<R: Runner>(runner: R) {
465 runner.start(|_| async move {
466 let output = Mutex::new(0);
468 select! {
469 v1 = ready(1) => {
470 *output.lock().unwrap() = v1;
471 },
472 v2 = ready(2) => {
473 *output.lock().unwrap() = v2;
474 },
475 };
476 assert_eq!(*output.lock().unwrap(), 1);
477
478 select! {
480 v1 = std::future::pending::<i32>() => {
481 *output.lock().unwrap() = v1;
482 },
483 v2 = ready(2) => {
484 *output.lock().unwrap() = v2;
485 },
486 };
487 assert_eq!(*output.lock().unwrap(), 2);
488 });
489 }
490
491 fn test_select_loop<R: Runner>(runner: R)
493 where
494 R::Context: Clock,
495 {
496 runner.start(|context| async move {
497 let (mut sender, mut receiver) = mpsc::unbounded();
499 for _ in 0..2 {
500 select! {
501 v = receiver.next() => {
502 panic!("unexpected value: {:?}", v);
503 },
504 _ = context.sleep(Duration::from_millis(100)) => {
505 continue;
506 },
507 };
508 }
509
510 sender.send(0).await.unwrap();
512 sender.send(1).await.unwrap();
513
514 select! {
516 _ = async {} => {
517 },
519 v = receiver.next() => {
520 panic!("unexpected value: {:?}", v);
521 },
522 };
523
524 for i in 0..2 {
526 select! {
527 _ = context.sleep(Duration::from_millis(100)) => {
528 panic!("timeout");
529 },
530 v = receiver.next() => {
531 assert_eq!(v.unwrap(), i);
532 },
533 };
534 }
535 });
536 }
537
538 fn test_storage_operations<R: Runner>(runner: R)
539 where
540 R::Context: Storage,
541 {
542 runner.start(|context| async move {
543 let partition = "test_partition";
544 let name = b"test_blob";
545
546 let (blob, _) = context
548 .open(partition, name)
549 .await
550 .expect("Failed to open blob");
551
552 let data = b"Hello, Storage!";
554 blob.write_at(data, 0)
555 .await
556 .expect("Failed to write to blob");
557
558 blob.sync().await.expect("Failed to sync blob");
560
561 let mut buffer = vec![0u8; data.len()];
563 blob.read_at(&mut buffer, 0)
564 .await
565 .expect("Failed to read from blob");
566 assert_eq!(&buffer, data);
567
568 blob.close().await.expect("Failed to close blob");
570
571 let blobs = context
573 .scan(partition)
574 .await
575 .expect("Failed to scan partition");
576 assert!(blobs.contains(&name.to_vec()));
577
578 let (blob, len) = context
580 .open(partition, name)
581 .await
582 .expect("Failed to reopen blob");
583 assert_eq!(len, data.len() as u64);
584
585 let mut buffer = vec![0u8; 7];
587 blob.read_at(&mut buffer, 7)
588 .await
589 .expect("Failed to read data");
590 assert_eq!(&buffer, b"Storage");
591
592 blob.close().await.expect("Failed to close blob");
594
595 context
597 .remove(partition, Some(name))
598 .await
599 .expect("Failed to remove blob");
600
601 let blobs = context
603 .scan(partition)
604 .await
605 .expect("Failed to scan partition");
606 assert!(!blobs.contains(&name.to_vec()));
607
608 context
610 .remove(partition, None)
611 .await
612 .expect("Failed to remove partition");
613
614 let result = context.scan(partition).await;
616 assert!(matches!(result, Err(Error::PartitionMissing(_))));
617 });
618 }
619
620 fn test_blob_read_write<R: Runner>(runner: R)
621 where
622 R::Context: Storage,
623 {
624 runner.start(|context| async move {
625 let partition = "test_partition";
626 let name = b"test_blob_rw";
627
628 let (blob, _) = context
630 .open(partition, name)
631 .await
632 .expect("Failed to open blob");
633
634 let data1 = b"Hello";
636 let data2 = b"World";
637 blob.write_at(data1, 0)
638 .await
639 .expect("Failed to write data1");
640 blob.write_at(data2, 5)
641 .await
642 .expect("Failed to write data2");
643
644 let mut buffer = vec![0u8; 10];
646 blob.read_at(&mut buffer, 0)
647 .await
648 .expect("Failed to read data");
649 assert_eq!(&buffer[..5], data1);
650 assert_eq!(&buffer[5..], data2);
651
652 let data3 = b"Store";
654 blob.write_at(data3, 5)
655 .await
656 .expect("Failed to write data3");
657
658 blob.truncate(5).await.expect("Failed to truncate blob");
660 let mut buffer = vec![0u8; 5];
661 blob.read_at(&mut buffer, 0)
662 .await
663 .expect("Failed to read data");
664 assert_eq!(&buffer[..5], data1);
665
666 let mut buffer = vec![0u8; 10];
668 let result = blob.read_at(&mut buffer, 0).await;
669 assert!(result.is_err());
670
671 blob.close().await.expect("Failed to close blob");
673 });
674 }
675
676 fn test_many_partition_read_write<R: Runner>(runner: R)
677 where
678 R::Context: Storage,
679 {
680 runner.start(|context| async move {
681 let partitions = ["partition1", "partition2", "partition3"];
682 let name = b"test_blob_rw";
683 let data1 = b"Hello";
684 let data2 = b"World";
685
686 for (additional, partition) in partitions.iter().enumerate() {
687 let (blob, _) = context
689 .open(partition, name)
690 .await
691 .expect("Failed to open blob");
692
693 blob.write_at(data1, 0)
695 .await
696 .expect("Failed to write data1");
697 blob.write_at(data2, 5 + additional as u64)
698 .await
699 .expect("Failed to write data2");
700
701 blob.close().await.expect("Failed to close blob");
703 }
704
705 for (additional, partition) in partitions.iter().enumerate() {
706 let (blob, len) = context
708 .open(partition, name)
709 .await
710 .expect("Failed to open blob");
711 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
712
713 let mut buffer = vec![0u8; 10 + additional];
715 blob.read_at(&mut buffer, 0)
716 .await
717 .expect("Failed to read data");
718 assert_eq!(&buffer[..5], b"Hello");
719 assert_eq!(&buffer[5 + additional..], b"World");
720
721 blob.close().await.expect("Failed to close blob");
723 }
724 });
725 }
726
727 fn test_blob_read_past_length<R: Runner>(runner: R)
728 where
729 R::Context: Storage,
730 {
731 runner.start(|context| async move {
732 let partition = "test_partition";
733 let name = b"test_blob_rw";
734
735 let (blob, _) = context
737 .open(partition, name)
738 .await
739 .expect("Failed to open blob");
740
741 let mut buffer = vec![0u8; 10];
743 let result = blob.read_at(&mut buffer, 0).await;
744 assert!(result.is_err());
745
746 let data = b"Hello, Storage!";
748 blob.write_at(data, 0)
749 .await
750 .expect("Failed to write to blob");
751
752 let mut buffer = vec![0u8; 20];
754 let result = blob.read_at(&mut buffer, 0).await;
755 assert!(result.is_err());
756 })
757 }
758
759 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
760 where
761 R::Context: Spawner + Storage + Metrics,
762 {
763 runner.start(|context| async move {
764 let partition = "test_partition";
765 let name = b"test_blob_rw";
766
767 let (blob, _) = context
769 .open(partition, name)
770 .await
771 .expect("Failed to open blob");
772
773 let data = b"Hello, Storage!";
775 blob.write_at(data, 0)
776 .await
777 .expect("Failed to write to blob");
778
779 blob.sync().await.expect("Failed to sync blob");
781
782 let check1 = context.with_label("check1").spawn({
784 let blob = blob.clone();
785 move |_| async move {
786 let mut buffer = vec![0u8; data.len()];
787 blob.read_at(&mut buffer, 0)
788 .await
789 .expect("Failed to read from blob");
790 assert_eq!(&buffer, data);
791 }
792 });
793 let check2 = context.with_label("check2").spawn({
794 let blob = blob.clone();
795 move |_| async move {
796 let mut buffer = vec![0u8; data.len()];
797 blob.read_at(&mut buffer, 0)
798 .await
799 .expect("Failed to read from blob");
800 assert_eq!(&buffer, data);
801 }
802 });
803
804 let result = join!(check1, check2);
806 assert!(result.0.is_ok());
807 assert!(result.1.is_ok());
808
809 let mut buffer = vec![0u8; data.len()];
811 blob.read_at(&mut buffer, 0)
812 .await
813 .expect("Failed to read from blob");
814 assert_eq!(&buffer, data);
815
816 blob.close().await.expect("Failed to close blob");
818
819 let buffer = context.encode();
821 assert!(buffer.contains("open_blobs 0"));
822 });
823 }
824
825 fn test_shutdown<R: Runner>(runner: R)
826 where
827 R::Context: Spawner + Metrics + Clock,
828 {
829 let kill = 9;
830 runner.start(|context| async move {
831 let before = context
833 .with_label("before")
834 .spawn(move |context| async move {
835 let sig = context.stopped().await;
836 assert_eq!(sig.unwrap(), kill);
837 });
838
839 let after = context
841 .with_label("after")
842 .spawn(move |context| async move {
843 let mut signal = context.stopped();
845 loop {
846 select! {
847 sig = &mut signal => {
848 assert_eq!(sig.unwrap(), kill);
850 break;
851 },
852 _ = context.sleep(Duration::from_millis(10)) => {
853 },
855 }
856 }
857 });
858
859 context.sleep(Duration::from_millis(50)).await;
861
862 context.stop(kill);
864
865 let result = join!(before, after);
867 assert!(result.0.is_ok());
868 assert!(result.1.is_ok());
869 });
870 }
871
872 fn test_spawn_ref<R: Runner>(runner: R)
873 where
874 R::Context: Spawner,
875 {
876 runner.start(|mut context| async move {
877 let handle = context.spawn_ref();
878 let result = handle(async move { 42 }).await;
879 assert!(matches!(result, Ok(42)));
880 });
881 }
882
883 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
884 where
885 R::Context: Spawner,
886 {
887 runner.start(|mut context| async move {
888 let handle = context.spawn_ref();
889 let result = handle(async move { 42 }).await;
890 assert!(matches!(result, Ok(42)));
891
892 let handle = context.spawn_ref();
894 let result = handle(async move { 42 }).await;
895 assert!(matches!(result, Ok(42)));
896 });
897 }
898
899 fn test_spawn_duplicate<R: Runner>(runner: R)
900 where
901 R::Context: Spawner,
902 {
903 runner.start(|mut context| async move {
904 let handle = context.spawn_ref();
905 let result = handle(async move { 42 }).await;
906 assert!(matches!(result, Ok(42)));
907
908 context.spawn(|_| async move { 42 });
910 });
911 }
912
913 fn test_spawn_blocking<R: Runner>(runner: R)
914 where
915 R::Context: Spawner,
916 {
917 runner.start(|context| async move {
918 let handle = context.spawn_blocking(|| 42);
919 let result = handle.await;
920 assert!(matches!(result, Ok(42)));
921 });
922 }
923
924 fn test_spawn_blocking_abort<R: Runner>(runner: R)
925 where
926 R::Context: Spawner,
927 {
928 runner.start(|context| async move {
929 let (sender, mut receiver) = oneshot::channel();
931 let handle = context.spawn_blocking(move || {
932 loop {
934 if receiver.try_recv().is_ok() {
935 break;
936 }
937 }
938
939 let mut count = 0;
941 loop {
942 count += 1;
943 if count >= 100_000_000 {
944 break;
945 }
946 }
947 count
948 });
949
950 handle.abort();
956 sender.send(()).unwrap();
957
958 assert!(matches!(handle.await, Ok(100_000_000)));
960 });
961 }
962
963 fn test_metrics<R: Runner>(runner: R)
964 where
965 R::Context: Metrics,
966 {
967 runner.start(|context| async move {
968 assert_eq!(context.label(), "");
970
971 let counter = Counter::<u64>::default();
973 context.register("test", "test", counter.clone());
974
975 counter.inc();
977
978 let buffer = context.encode();
980 assert!(buffer.contains("test_total 1"));
981
982 let context = context.with_label("nested");
984 let nested_counter = Counter::<u64>::default();
985 context.register("test", "test", nested_counter.clone());
986
987 nested_counter.inc();
989
990 let buffer = context.encode();
992 assert!(buffer.contains("nested_test_total 1"));
993 assert!(buffer.contains("test_total 1"));
994 });
995 }
996
997 fn test_metrics_label<R: Runner>(runner: R)
998 where
999 R::Context: Metrics,
1000 {
1001 runner.start(|context| async move {
1002 context.with_label(METRICS_PREFIX);
1003 })
1004 }
1005
1006 #[test]
1007 fn test_deterministic_future() {
1008 let runner = deterministic::Runner::default();
1009 test_error_future(runner);
1010 }
1011
1012 #[test]
1013 fn test_deterministic_clock_sleep() {
1014 let executor = deterministic::Runner::default();
1015 test_clock_sleep(executor);
1016 }
1017
1018 #[test]
1019 fn test_deterministic_clock_sleep_until() {
1020 let executor = deterministic::Runner::default();
1021 test_clock_sleep_until(executor);
1022 }
1023
1024 #[test]
1025 fn test_deterministic_root_finishes() {
1026 let executor = deterministic::Runner::default();
1027 test_root_finishes(executor);
1028 }
1029
1030 #[test]
1031 fn test_deterministic_spawn_abort() {
1032 let executor = deterministic::Runner::default();
1033 test_spawn_abort(executor);
1034 }
1035
1036 #[test]
1037 fn test_deterministic_panic_aborts_root() {
1038 let runner = deterministic::Runner::default();
1039 test_panic_aborts_root(runner);
1040 }
1041
1042 #[test]
1043 #[should_panic(expected = "blah")]
1044 fn test_deterministic_panic_aborts_spawn() {
1045 let executor = deterministic::Runner::default();
1046 test_panic_aborts_spawn(executor);
1047 }
1048
1049 #[test]
1050 fn test_deterministic_select() {
1051 let executor = deterministic::Runner::default();
1052 test_select(executor);
1053 }
1054
1055 #[test]
1056 fn test_deterministic_select_loop() {
1057 let executor = deterministic::Runner::default();
1058 test_select_loop(executor);
1059 }
1060
1061 #[test]
1062 fn test_deterministic_storage_operations() {
1063 let executor = deterministic::Runner::default();
1064 test_storage_operations(executor);
1065 }
1066
1067 #[test]
1068 fn test_deterministic_blob_read_write() {
1069 let executor = deterministic::Runner::default();
1070 test_blob_read_write(executor);
1071 }
1072
1073 #[test]
1074 fn test_deterministic_many_partition_read_write() {
1075 let executor = deterministic::Runner::default();
1076 test_many_partition_read_write(executor);
1077 }
1078
1079 #[test]
1080 fn test_deterministic_blob_read_past_length() {
1081 let executor = deterministic::Runner::default();
1082 test_blob_read_past_length(executor);
1083 }
1084
1085 #[test]
1086 fn test_deterministic_blob_clone_and_concurrent_read() {
1087 let executor = deterministic::Runner::default();
1089 test_blob_clone_and_concurrent_read(executor);
1090 }
1091
1092 #[test]
1093 fn test_deterministic_shutdown() {
1094 let executor = deterministic::Runner::default();
1095 test_shutdown(executor);
1096 }
1097
1098 #[test]
1099 fn test_deterministic_spawn_ref() {
1100 let executor = deterministic::Runner::default();
1101 test_spawn_ref(executor);
1102 }
1103
1104 #[test]
1105 #[should_panic]
1106 fn test_deterministic_spawn_ref_duplicate() {
1107 let executor = deterministic::Runner::default();
1108 test_spawn_ref_duplicate(executor);
1109 }
1110
1111 #[test]
1112 #[should_panic]
1113 fn test_deterministic_spawn_duplicate() {
1114 let executor = deterministic::Runner::default();
1115 test_spawn_duplicate(executor);
1116 }
1117
1118 #[test]
1119 fn test_deterministic_spawn_blocking() {
1120 let executor = deterministic::Runner::default();
1121 test_spawn_blocking(executor);
1122 }
1123
1124 #[test]
1125 #[should_panic(expected = "blocking task panicked")]
1126 fn test_deterministic_spawn_blocking_panic() {
1127 let executor = deterministic::Runner::default();
1128 executor.start(|context| async move {
1129 let handle = context.spawn_blocking(|| {
1130 panic!("blocking task panicked");
1131 });
1132 handle.await.unwrap();
1133 });
1134 }
1135
1136 #[test]
1137 fn test_deterministic_spawn_blocking_abort() {
1138 let executor = deterministic::Runner::default();
1139 test_spawn_blocking_abort(executor);
1140 }
1141
1142 #[test]
1143 fn test_deterministic_metrics() {
1144 let executor = deterministic::Runner::default();
1145 test_metrics(executor);
1146 }
1147
1148 #[test]
1149 #[should_panic]
1150 fn test_deterministic_metrics_label() {
1151 let executor = deterministic::Runner::default();
1152 test_metrics_label(executor);
1153 }
1154
1155 #[test]
1156 fn test_tokio_error_future() {
1157 let runner = tokio::Runner::default();
1158 test_error_future(runner);
1159 }
1160
1161 #[test]
1162 fn test_tokio_clock_sleep() {
1163 let executor = tokio::Runner::default();
1164 test_clock_sleep(executor);
1165 }
1166
1167 #[test]
1168 fn test_tokio_clock_sleep_until() {
1169 let executor = tokio::Runner::default();
1170 test_clock_sleep_until(executor);
1171 }
1172
1173 #[test]
1174 fn test_tokio_root_finishes() {
1175 let executor = tokio::Runner::default();
1176 test_root_finishes(executor);
1177 }
1178
1179 #[test]
1180 fn test_tokio_spawn_abort() {
1181 let executor = tokio::Runner::default();
1182 test_spawn_abort(executor);
1183 }
1184
1185 #[test]
1186 fn test_tokio_panic_aborts_root() {
1187 let executor = tokio::Runner::default();
1188 test_panic_aborts_root(executor);
1189 }
1190
1191 #[test]
1192 fn test_tokio_panic_aborts_spawn() {
1193 let executor = tokio::Runner::default();
1194 test_panic_aborts_spawn(executor);
1195 }
1196
1197 #[test]
1198 fn test_tokio_select() {
1199 let executor = tokio::Runner::default();
1200 test_select(executor);
1201 }
1202
1203 #[test]
1204 fn test_tokio_select_loop() {
1205 let executor = tokio::Runner::default();
1206 test_select_loop(executor);
1207 }
1208
1209 #[test]
1210 fn test_tokio_storage_operations() {
1211 let executor = tokio::Runner::default();
1212 test_storage_operations(executor);
1213 }
1214
1215 #[test]
1216 fn test_tokio_blob_read_write() {
1217 let executor = tokio::Runner::default();
1218 test_blob_read_write(executor);
1219 }
1220
1221 #[test]
1222 fn test_tokio_many_partition_read_write() {
1223 let executor = tokio::Runner::default();
1224 test_many_partition_read_write(executor);
1225 }
1226
1227 #[test]
1228 fn test_tokio_blob_read_past_length() {
1229 let executor = tokio::Runner::default();
1230 test_blob_read_past_length(executor);
1231 }
1232
1233 #[test]
1234 fn test_tokio_blob_clone_and_concurrent_read() {
1235 let executor = tokio::Runner::default();
1237 test_blob_clone_and_concurrent_read(executor);
1238 }
1239
1240 #[test]
1241 fn test_tokio_shutdown() {
1242 let executor = tokio::Runner::default();
1243 test_shutdown(executor);
1244 }
1245
1246 #[test]
1247 fn test_tokio_spawn_ref() {
1248 let executor = tokio::Runner::default();
1249 test_spawn_ref(executor);
1250 }
1251
1252 #[test]
1253 #[should_panic]
1254 fn test_tokio_spawn_ref_duplicate() {
1255 let executor = tokio::Runner::default();
1256 test_spawn_ref_duplicate(executor);
1257 }
1258
1259 #[test]
1260 #[should_panic]
1261 fn test_tokio_spawn_duplicate() {
1262 let executor = tokio::Runner::default();
1263 test_spawn_duplicate(executor);
1264 }
1265
1266 #[test]
1267 fn test_tokio_spawn_blocking() {
1268 let executor = tokio::Runner::default();
1269 test_spawn_blocking(executor);
1270 }
1271
1272 #[test]
1273 fn test_tokio_spawn_blocking_panic() {
1274 let executor = tokio::Runner::default();
1275 executor.start(|context| async move {
1276 let handle = context.spawn_blocking(|| {
1277 panic!("blocking task panicked");
1278 });
1279 let result = handle.await;
1280 assert!(matches!(result, Err(Error::Exited)));
1281 });
1282 }
1283
1284 #[test]
1285 fn test_tokio_spawn_blocking_abort() {
1286 let executor = tokio::Runner::default();
1287 test_spawn_blocking_abort(executor);
1288 }
1289
1290 #[test]
1291 fn test_tokio_metrics() {
1292 let executor = tokio::Runner::default();
1293 test_metrics(executor);
1294 }
1295
1296 #[test]
1297 #[should_panic]
1298 fn test_tokio_metrics_label() {
1299 let executor = tokio::Runner::default();
1300 test_metrics_label(executor);
1301 }
1302
1303 #[test]
1304 fn test_tokio_telemetry() {
1305 let executor = tokio::Runner::default();
1306 executor.start(|context| async move {
1307 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1309
1310 tokio::telemetry::init(
1312 context.with_label("metrics"),
1313 tokio::telemetry::Logging {
1314 level: Level::INFO,
1315 json: false,
1316 },
1317 Some(address),
1318 None,
1319 );
1320
1321 let counter: Counter<u64> = Counter::default();
1323 context.register("test_counter", "Test counter", counter.clone());
1324 counter.inc();
1325
1326 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1328 let mut line = Vec::new();
1329 loop {
1330 let mut byte = [0; 1];
1331 stream.recv(&mut byte).await?;
1332 if byte[0] == b'\n' {
1333 if line.last() == Some(&b'\r') {
1334 line.pop(); }
1336 break;
1337 }
1338 line.push(byte[0]);
1339 }
1340 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1341 }
1342
1343 async fn read_headers<St: Stream>(
1344 stream: &mut St,
1345 ) -> Result<HashMap<String, String>, Error> {
1346 let mut headers = HashMap::new();
1347 loop {
1348 let line = read_line(stream).await?;
1349 if line.is_empty() {
1350 break;
1351 }
1352 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1353 if parts.len() == 2 {
1354 headers.insert(parts[0].to_string(), parts[1].to_string());
1355 }
1356 }
1357 Ok(headers)
1358 }
1359
1360 async fn read_body<St: Stream>(
1361 stream: &mut St,
1362 content_length: usize,
1363 ) -> Result<String, Error> {
1364 let mut body = vec![0; content_length];
1365 stream.recv(&mut body).await?;
1366 String::from_utf8(body).map_err(|_| Error::ReadFailed)
1367 }
1368
1369 let client_handle = context
1371 .with_label("client")
1372 .spawn(move |context| async move {
1373 let (mut sink, mut stream) = loop {
1374 match context.dial(address).await {
1375 Ok((sink, stream)) => break (sink, stream),
1376 Err(e) => {
1377 error!(err =?e, "failed to connect");
1379 context.sleep(Duration::from_millis(10)).await;
1380 }
1381 }
1382 };
1383
1384 let request = format!(
1386 "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1387 address
1388 );
1389 sink.send(request.as_bytes()).await.unwrap();
1390
1391 let status_line = read_line(&mut stream).await.unwrap();
1393 assert_eq!(status_line, "HTTP/1.1 200 OK");
1394
1395 let headers = read_headers(&mut stream).await.unwrap();
1397 println!("Headers: {:?}", headers);
1398 let content_length = headers
1399 .get("content-length")
1400 .unwrap()
1401 .parse::<usize>()
1402 .unwrap();
1403
1404 let body = read_body(&mut stream, content_length).await.unwrap();
1406 assert!(body.contains("test_counter_total 1"));
1407 });
1408
1409 client_handle.await.unwrap();
1411 });
1412 }
1413}