1use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23 future::Future,
24 io::Error as IoError,
25 net::SocketAddr,
26 time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36 if #[cfg(not(target_arch = "wasm32"))] {
37 pub mod tokio;
38 pub mod benchmarks;
39 }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49const METRICS_PREFIX: &str = "runtime";
51
52#[derive(Error, Debug)]
54pub enum Error {
55 #[error("exited")]
56 Exited,
57 #[error("closed")]
58 Closed,
59 #[error("timeout")]
60 Timeout,
61 #[error("bind failed")]
62 BindFailed,
63 #[error("connection failed")]
64 ConnectionFailed,
65 #[error("write failed")]
66 WriteFailed,
67 #[error("read failed")]
68 ReadFailed,
69 #[error("send failed")]
70 SendFailed,
71 #[error("recv failed")]
72 RecvFailed,
73 #[error("partition creation failed: {0}")]
74 PartitionCreationFailed(String),
75 #[error("partition missing: {0}")]
76 PartitionMissing(String),
77 #[error("partition corrupt: {0}")]
78 PartitionCorrupt(String),
79 #[error("blob open failed: {0}/{1} error: {2}")]
80 BlobOpenFailed(String, String, IoError),
81 #[error("blob missing: {0}/{1}")]
82 BlobMissing(String, String),
83 #[error("blob resize failed: {0}/{1} error: {2}")]
84 BlobResizeFailed(String, String, IoError),
85 #[error("blob sync failed: {0}/{1} error: {2}")]
86 BlobSyncFailed(String, String, IoError),
87 #[error("blob close failed: {0}/{1} error: {2}")]
88 BlobCloseFailed(String, String, IoError),
89 #[error("blob insufficient length")]
90 BlobInsufficientLength,
91 #[error("offset overflow")]
92 OffsetOverflow,
93 #[error("io error: {0}")]
94 Io(#[from] IoError),
95}
96
97pub trait Runner {
100 type Context;
106
107 fn start<F, Fut>(self, f: F) -> Fut::Output
109 where
110 F: FnOnce(Self::Context) -> Fut,
111 Fut: Future;
112}
113
114pub trait Spawner: Clone + Send + Sync + 'static {
116 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
124 where
125 F: FnOnce(Self) -> Fut + Send + 'static,
126 Fut: Future<Output = T> + Send + 'static,
127 T: Send + 'static;
128
129 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
138 where
139 F: Future<Output = T> + Send + 'static,
140 T: Send + 'static;
141
142 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
161 where
162 F: FnOnce(Self) -> T + Send + 'static,
163 T: Send + 'static;
164
165 fn spawn_blocking_ref<F, T>(
174 &mut self,
175 dedicated: bool,
176 ) -> impl FnOnce(F) -> Handle<T> + 'static
177 where
178 F: FnOnce() -> T + Send + 'static,
179 T: Send + 'static;
180
181 fn stop(&self, value: i32);
188
189 fn stopped(&self) -> Signal;
194}
195
196pub trait Metrics: Clone + Send + Sync + 'static {
198 fn label(&self) -> String;
200
201 fn with_label(&self, label: &str) -> Self;
209
210 fn scoped_label(&self, label: &str) -> String {
214 let label = if self.label().is_empty() {
215 label.to_string()
216 } else {
217 format!("{}_{}", self.label(), label)
218 };
219 assert!(
220 !label.starts_with(METRICS_PREFIX),
221 "using runtime label is not allowed"
222 );
223 label
224 }
225
226 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
230
231 fn encode(&self) -> String;
233}
234
235pub trait Clock: Clone + Send + Sync + 'static {
241 fn current(&self) -> SystemTime;
243
244 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
246
247 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
249}
250
251pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
253
254pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
256
257pub type ListenerOf<N> = <N as crate::Network>::Listener;
259
260pub trait Network: Clone + Send + Sync + 'static {
263 type Listener: Listener;
267
268 fn bind(
270 &self,
271 socket: SocketAddr,
272 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
273
274 fn dial(
276 &self,
277 socket: SocketAddr,
278 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
279}
280
281pub trait Listener: Sync + Send + 'static {
284 type Sink: Sink;
287 type Stream: Stream;
290
291 fn accept(
293 &mut self,
294 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
295
296 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
298}
299
300pub trait Sink: Sync + Send + 'static {
303 fn send(
305 &mut self,
306 msg: impl Into<StableBuf> + Send,
307 ) -> impl Future<Output = Result<(), Error>> + Send;
308}
309
310pub trait Stream: Sync + Send + 'static {
313 fn recv(
316 &mut self,
317 buf: impl Into<StableBuf> + Send,
318 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
319}
320
321pub trait Storage: Clone + Send + Sync + 'static {
329 type Blob: Blob;
331
332 fn open(
338 &self,
339 partition: &str,
340 name: &[u8],
341 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
342
343 fn remove(
347 &self,
348 partition: &str,
349 name: Option<&[u8]>,
350 ) -> impl Future<Output = Result<(), Error>> + Send;
351
352 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
354}
355
356#[allow(clippy::len_without_is_empty)]
367pub trait Blob: Clone + Send + Sync + 'static {
368 fn read_at(
373 &self,
374 buf: impl Into<StableBuf> + Send,
375 offset: u64,
376 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
377
378 fn write_at(
380 &self,
381 buf: impl Into<StableBuf> + Send,
382 offset: u64,
383 ) -> impl Future<Output = Result<(), Error>> + Send;
384
385 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
390
391 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
393
394 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use bytes::Bytes;
402 use commonware_macros::select;
403 use futures::{
404 channel::{mpsc, oneshot},
405 future::ready,
406 join, SinkExt, StreamExt,
407 };
408 use prometheus_client::metrics::counter::Counter;
409 use std::{
410 collections::HashMap,
411 panic::{catch_unwind, AssertUnwindSafe},
412 str::FromStr,
413 sync::Mutex,
414 };
415 use tracing::{error, Level};
416 use utils::reschedule;
417
418 fn test_error_future<R: Runner>(runner: R) {
419 async fn error_future() -> Result<&'static str, &'static str> {
420 Err("An error occurred")
421 }
422 let result = runner.start(|_| error_future());
423 assert_eq!(result, Err("An error occurred"));
424 }
425
426 fn test_clock_sleep<R: Runner>(runner: R)
427 where
428 R::Context: Spawner + Clock,
429 {
430 runner.start(|context| async move {
431 let start = context.current();
433 let sleep_duration = Duration::from_millis(10);
434 context.sleep(sleep_duration).await;
435
436 let end = context.current();
438 assert!(end.duration_since(start).unwrap() >= sleep_duration);
439 });
440 }
441
442 fn test_clock_sleep_until<R: Runner>(runner: R)
443 where
444 R::Context: Spawner + Clock,
445 {
446 runner.start(|context| async move {
447 let now = context.current();
449 context.sleep_until(now + Duration::from_millis(100)).await;
450
451 let elapsed = now.elapsed().unwrap();
453 assert!(elapsed >= Duration::from_millis(100));
454 });
455 }
456
457 fn test_root_finishes<R: Runner>(runner: R)
458 where
459 R::Context: Spawner,
460 {
461 runner.start(|context| async move {
462 context.spawn(|_| async move {
463 loop {
464 reschedule().await;
465 }
466 });
467 });
468 }
469
470 fn test_spawn_abort<R: Runner>(runner: R)
471 where
472 R::Context: Spawner,
473 {
474 runner.start(|context| async move {
475 let handle = context.spawn(|_| async move {
476 loop {
477 reschedule().await;
478 }
479 });
480 handle.abort();
481 assert!(matches!(handle.await, Err(Error::Closed)));
482 });
483 }
484
485 fn test_panic_aborts_root<R: Runner>(runner: R) {
486 let result = catch_unwind(AssertUnwindSafe(|| {
487 runner.start(|_| async move {
488 panic!("blah");
489 });
490 }));
491 result.unwrap_err();
492 }
493
494 fn test_panic_aborts_spawn<R: Runner>(runner: R)
495 where
496 R::Context: Spawner,
497 {
498 let result = runner.start(|context| async move {
499 let result = context.spawn(|_| async move {
500 panic!("blah");
501 });
502 assert!(matches!(result.await, Err(Error::Exited)));
503 Result::<(), Error>::Ok(())
504 });
505
506 result.unwrap();
508 }
509
510 fn test_select<R: Runner>(runner: R) {
511 runner.start(|_| async move {
512 let output = Mutex::new(0);
514 select! {
515 v1 = ready(1) => {
516 *output.lock().unwrap() = v1;
517 },
518 v2 = ready(2) => {
519 *output.lock().unwrap() = v2;
520 },
521 };
522 assert_eq!(*output.lock().unwrap(), 1);
523
524 select! {
526 v1 = std::future::pending::<i32>() => {
527 *output.lock().unwrap() = v1;
528 },
529 v2 = ready(2) => {
530 *output.lock().unwrap() = v2;
531 },
532 };
533 assert_eq!(*output.lock().unwrap(), 2);
534 });
535 }
536
537 fn test_select_loop<R: Runner>(runner: R)
539 where
540 R::Context: Clock,
541 {
542 runner.start(|context| async move {
543 let (mut sender, mut receiver) = mpsc::unbounded();
545 for _ in 0..2 {
546 select! {
547 v = receiver.next() => {
548 panic!("unexpected value: {v:?}");
549 },
550 _ = context.sleep(Duration::from_millis(100)) => {
551 continue;
552 },
553 };
554 }
555
556 sender.send(0).await.unwrap();
558 sender.send(1).await.unwrap();
559
560 select! {
562 _ = async {} => {
563 },
565 v = receiver.next() => {
566 panic!("unexpected value: {v:?}");
567 },
568 };
569
570 for i in 0..2 {
572 select! {
573 _ = context.sleep(Duration::from_millis(100)) => {
574 panic!("timeout");
575 },
576 v = receiver.next() => {
577 assert_eq!(v.unwrap(), i);
578 },
579 };
580 }
581 });
582 }
583
584 fn test_storage_operations<R: Runner>(runner: R)
585 where
586 R::Context: Storage,
587 {
588 runner.start(|context| async move {
589 let partition = "test_partition";
590 let name = b"test_blob";
591
592 let (blob, _) = context
594 .open(partition, name)
595 .await
596 .expect("Failed to open blob");
597
598 let data = b"Hello, Storage!";
600 blob.write_at(Vec::from(data), 0)
601 .await
602 .expect("Failed to write to blob");
603
604 blob.sync().await.expect("Failed to sync blob");
606
607 let read = blob
609 .read_at(vec![0; data.len()], 0)
610 .await
611 .expect("Failed to read from blob");
612 assert_eq!(read.as_ref(), data);
613
614 blob.close().await.expect("Failed to close blob");
616
617 let blobs = context
619 .scan(partition)
620 .await
621 .expect("Failed to scan partition");
622 assert!(blobs.contains(&name.to_vec()));
623
624 let (blob, len) = context
626 .open(partition, name)
627 .await
628 .expect("Failed to reopen blob");
629 assert_eq!(len, data.len() as u64);
630
631 let read = blob
633 .read_at(vec![0u8; 7], 7)
634 .await
635 .expect("Failed to read data");
636 assert_eq!(read.as_ref(), b"Storage");
637
638 blob.close().await.expect("Failed to close blob");
640
641 context
643 .remove(partition, Some(name))
644 .await
645 .expect("Failed to remove blob");
646
647 let blobs = context
649 .scan(partition)
650 .await
651 .expect("Failed to scan partition");
652 assert!(!blobs.contains(&name.to_vec()));
653
654 context
656 .remove(partition, None)
657 .await
658 .expect("Failed to remove partition");
659
660 let result = context.scan(partition).await;
662 assert!(matches!(result, Err(Error::PartitionMissing(_))));
663 });
664 }
665
666 fn test_blob_read_write<R: Runner>(runner: R)
667 where
668 R::Context: Storage,
669 {
670 runner.start(|context| async move {
671 let partition = "test_partition";
672 let name = b"test_blob_rw";
673
674 let (blob, _) = context
676 .open(partition, name)
677 .await
678 .expect("Failed to open blob");
679
680 let data1 = b"Hello";
682 let data2 = b"World";
683 blob.write_at(Vec::from(data1), 0)
684 .await
685 .expect("Failed to write data1");
686 blob.write_at(Vec::from(data2), 5)
687 .await
688 .expect("Failed to write data2");
689
690 let read = blob
692 .read_at(vec![0u8; 10], 0)
693 .await
694 .expect("Failed to read data");
695 assert_eq!(&read.as_ref()[..5], data1);
696 assert_eq!(&read.as_ref()[5..], data2);
697
698 let result = blob.read_at(vec![0u8; 10], 10).await;
700 assert!(result.is_err());
701
702 let data3 = b"Store";
704 blob.write_at(Vec::from(data3), 5)
705 .await
706 .expect("Failed to write data3");
707
708 let read = blob
710 .read_at(vec![0u8; 10], 0)
711 .await
712 .expect("Failed to read data");
713 assert_eq!(&read.as_ref()[..5], data1);
714 assert_eq!(&read.as_ref()[5..], data3);
715
716 let result = blob.read_at(vec![0u8; 10], 10).await;
718 assert!(result.is_err());
719 });
720 }
721
722 fn test_blob_resize<R: Runner>(runner: R)
723 where
724 R::Context: Storage,
725 {
726 runner.start(|context| async move {
727 let partition = "test_partition_resize";
728 let name = b"test_blob_resize";
729
730 let (blob, _) = context
732 .open(partition, name)
733 .await
734 .expect("Failed to open blob");
735
736 let data = b"some data";
737 blob.write_at(data.to_vec(), 0)
738 .await
739 .expect("Failed to write");
740 blob.sync().await.expect("Failed to sync after write");
741 blob.close()
742 .await
743 .expect("Failed to close blob after writing");
744
745 let (blob, len) = context.open(partition, name).await.unwrap();
747 assert_eq!(len, data.len() as u64);
748
749 let new_len = (data.len() as u64) * 2;
751 blob.resize(new_len)
752 .await
753 .expect("Failed to resize to extend");
754 blob.sync().await.expect("Failed to sync after resize");
755 blob.close()
756 .await
757 .expect("Failed to close blob after resizing");
758
759 let (blob, len) = context.open(partition, name).await.unwrap();
761 assert_eq!(len, new_len);
762
763 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
765 assert_eq!(read_buf.as_ref(), data);
766
767 let extended_part = blob
769 .read_at(vec![0; data.len()], data.len() as u64)
770 .await
771 .unwrap();
772 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
773
774 blob.resize(data.len() as u64).await.unwrap();
776 blob.sync().await.unwrap();
777 blob.close().await.unwrap();
778
779 let (blob, size) = context.open(partition, name).await.unwrap();
781 assert_eq!(size, data.len() as u64);
782
783 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
785 assert_eq!(read_buf.as_ref(), data);
786 blob.close().await.unwrap();
787 });
788 }
789
790 fn test_many_partition_read_write<R: Runner>(runner: R)
791 where
792 R::Context: Storage,
793 {
794 runner.start(|context| async move {
795 let partitions = ["partition1", "partition2", "partition3"];
796 let name = b"test_blob_rw";
797 let data1 = b"Hello";
798 let data2 = b"World";
799
800 for (additional, partition) in partitions.iter().enumerate() {
801 let (blob, _) = context
803 .open(partition, name)
804 .await
805 .expect("Failed to open blob");
806
807 blob.write_at(Vec::from(data1), 0)
809 .await
810 .expect("Failed to write data1");
811 blob.write_at(Vec::from(data2), 5 + additional as u64)
812 .await
813 .expect("Failed to write data2");
814
815 blob.close().await.expect("Failed to close blob");
817 }
818
819 for (additional, partition) in partitions.iter().enumerate() {
820 let (blob, len) = context
822 .open(partition, name)
823 .await
824 .expect("Failed to open blob");
825 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
826
827 let read = blob
829 .read_at(vec![0u8; 10 + additional], 0)
830 .await
831 .expect("Failed to read data");
832 assert_eq!(&read.as_ref()[..5], b"Hello");
833 assert_eq!(&read.as_ref()[5 + additional..], b"World");
834
835 blob.close().await.expect("Failed to close blob");
837 }
838 });
839 }
840
841 fn test_blob_read_past_length<R: Runner>(runner: R)
842 where
843 R::Context: Storage,
844 {
845 runner.start(|context| async move {
846 let partition = "test_partition";
847 let name = b"test_blob_rw";
848
849 let (blob, _) = context
851 .open(partition, name)
852 .await
853 .expect("Failed to open blob");
854
855 let result = blob.read_at(vec![0u8; 10], 0).await;
857 assert!(result.is_err());
858
859 let data = b"Hello, Storage!".to_vec();
861 blob.write_at(data, 0)
862 .await
863 .expect("Failed to write to blob");
864
865 let result = blob.read_at(vec![0u8; 20], 0).await;
867 assert!(result.is_err());
868 })
869 }
870
871 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
872 where
873 R::Context: Spawner + Storage + Metrics,
874 {
875 runner.start(|context| async move {
876 let partition = "test_partition";
877 let name = b"test_blob_rw";
878
879 let (blob, _) = context
881 .open(partition, name)
882 .await
883 .expect("Failed to open blob");
884
885 let data = b"Hello, Storage!";
887 blob.write_at(Vec::from(data), 0)
888 .await
889 .expect("Failed to write to blob");
890
891 blob.sync().await.expect("Failed to sync blob");
893
894 let check1 = context.with_label("check1").spawn({
896 let blob = blob.clone();
897 move |_| async move {
898 let read = blob
899 .read_at(vec![0u8; data.len()], 0)
900 .await
901 .expect("Failed to read from blob");
902 assert_eq!(read.as_ref(), data);
903 }
904 });
905 let check2 = context.with_label("check2").spawn({
906 let blob = blob.clone();
907 move |_| async move {
908 let read = blob
909 .read_at(vec![0; data.len()], 0)
910 .await
911 .expect("Failed to read from blob");
912 assert_eq!(read.as_ref(), data);
913 }
914 });
915
916 let result = join!(check1, check2);
918 assert!(result.0.is_ok());
919 assert!(result.1.is_ok());
920
921 let read = blob
923 .read_at(vec![0; data.len()], 0)
924 .await
925 .expect("Failed to read from blob");
926 assert_eq!(read.as_ref(), data);
927
928 blob.close().await.expect("Failed to close blob");
930
931 let buffer = context.encode();
933 assert!(buffer.contains("open_blobs 0"));
934 });
935 }
936
937 fn test_shutdown<R: Runner>(runner: R)
938 where
939 R::Context: Spawner + Metrics + Clock,
940 {
941 let kill = 9;
942 runner.start(|context| async move {
943 let before = context
945 .with_label("before")
946 .spawn(move |context| async move {
947 let sig = context.stopped().await;
948 assert_eq!(sig.unwrap(), kill);
949 });
950
951 let after = context
953 .with_label("after")
954 .spawn(move |context| async move {
955 let mut signal = context.stopped();
957 loop {
958 select! {
959 sig = &mut signal => {
960 assert_eq!(sig.unwrap(), kill);
962 break;
963 },
964 _ = context.sleep(Duration::from_millis(10)) => {
965 },
967 }
968 }
969 });
970
971 context.sleep(Duration::from_millis(50)).await;
973
974 context.stop(kill);
976
977 let result = join!(before, after);
979 assert!(result.0.is_ok());
980 assert!(result.1.is_ok());
981 });
982 }
983
984 fn test_spawn_ref<R: Runner>(runner: R)
985 where
986 R::Context: Spawner,
987 {
988 runner.start(|mut context| async move {
989 let handle = context.spawn_ref();
990 let result = handle(async move { 42 }).await;
991 assert!(matches!(result, Ok(42)));
992 });
993 }
994
995 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
996 where
997 R::Context: Spawner,
998 {
999 runner.start(|mut context| async move {
1000 let handle = context.spawn_ref();
1001 let result = handle(async move { 42 }).await;
1002 assert!(matches!(result, Ok(42)));
1003
1004 let handle = context.spawn_ref();
1006 let result = handle(async move { 42 }).await;
1007 assert!(matches!(result, Ok(42)));
1008 });
1009 }
1010
1011 fn test_spawn_duplicate<R: Runner>(runner: R)
1012 where
1013 R::Context: Spawner,
1014 {
1015 runner.start(|mut context| async move {
1016 let handle = context.spawn_ref();
1017 let result = handle(async move { 42 }).await;
1018 assert!(matches!(result, Ok(42)));
1019
1020 context.spawn(|_| async move { 42 });
1022 });
1023 }
1024
1025 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1026 where
1027 R::Context: Spawner,
1028 {
1029 runner.start(|context| async move {
1030 let handle = context.spawn_blocking(dedicated, |_| 42);
1031 let result = handle.await;
1032 assert!(matches!(result, Ok(42)));
1033 });
1034 }
1035
1036 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1037 where
1038 R::Context: Spawner,
1039 {
1040 runner.start(|mut context| async move {
1041 let spawn = context.spawn_blocking_ref(dedicated);
1042 let handle = spawn(|| 42);
1043 let result = handle.await;
1044 assert!(matches!(result, Ok(42)));
1045 });
1046 }
1047
1048 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1049 where
1050 R::Context: Spawner,
1051 {
1052 runner.start(|mut context| async move {
1053 let spawn = context.spawn_blocking_ref(dedicated);
1054 let result = spawn(|| 42).await;
1055 assert!(matches!(result, Ok(42)));
1056
1057 context.spawn_blocking(dedicated, |_| 42);
1059 });
1060 }
1061
1062 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1063 where
1064 R::Context: Spawner,
1065 {
1066 runner.start(|context| async move {
1067 let (sender, mut receiver) = oneshot::channel();
1069 let handle = context.spawn_blocking(dedicated, move |_| {
1070 loop {
1072 if receiver.try_recv().is_ok() {
1073 break;
1074 }
1075 }
1076
1077 let mut count = 0;
1079 loop {
1080 count += 1;
1081 if count >= 100_000_000 {
1082 break;
1083 }
1084 }
1085 count
1086 });
1087
1088 handle.abort();
1094 sender.send(()).unwrap();
1095
1096 assert!(matches!(handle.await, Ok(100_000_000)));
1098 });
1099 }
1100
1101 fn test_metrics<R: Runner>(runner: R)
1102 where
1103 R::Context: Metrics,
1104 {
1105 runner.start(|context| async move {
1106 assert_eq!(context.label(), "");
1108
1109 let counter = Counter::<u64>::default();
1111 context.register("test", "test", counter.clone());
1112
1113 counter.inc();
1115
1116 let buffer = context.encode();
1118 assert!(buffer.contains("test_total 1"));
1119
1120 let context = context.with_label("nested");
1122 let nested_counter = Counter::<u64>::default();
1123 context.register("test", "test", nested_counter.clone());
1124
1125 nested_counter.inc();
1127
1128 let buffer = context.encode();
1130 assert!(buffer.contains("nested_test_total 1"));
1131 assert!(buffer.contains("test_total 1"));
1132 });
1133 }
1134
1135 fn test_metrics_label<R: Runner>(runner: R)
1136 where
1137 R::Context: Metrics,
1138 {
1139 runner.start(|context| async move {
1140 context.with_label(METRICS_PREFIX);
1141 })
1142 }
1143
1144 #[test]
1145 fn test_deterministic_future() {
1146 let runner = deterministic::Runner::default();
1147 test_error_future(runner);
1148 }
1149
1150 #[test]
1151 fn test_deterministic_clock_sleep() {
1152 let executor = deterministic::Runner::default();
1153 test_clock_sleep(executor);
1154 }
1155
1156 #[test]
1157 fn test_deterministic_clock_sleep_until() {
1158 let executor = deterministic::Runner::default();
1159 test_clock_sleep_until(executor);
1160 }
1161
1162 #[test]
1163 fn test_deterministic_root_finishes() {
1164 let executor = deterministic::Runner::default();
1165 test_root_finishes(executor);
1166 }
1167
1168 #[test]
1169 fn test_deterministic_spawn_abort() {
1170 let executor = deterministic::Runner::default();
1171 test_spawn_abort(executor);
1172 }
1173
1174 #[test]
1175 fn test_deterministic_panic_aborts_root() {
1176 let runner = deterministic::Runner::default();
1177 test_panic_aborts_root(runner);
1178 }
1179
1180 #[test]
1181 #[should_panic(expected = "blah")]
1182 fn test_deterministic_panic_aborts_spawn() {
1183 let executor = deterministic::Runner::default();
1184 test_panic_aborts_spawn(executor);
1185 }
1186
1187 #[test]
1188 fn test_deterministic_select() {
1189 let executor = deterministic::Runner::default();
1190 test_select(executor);
1191 }
1192
1193 #[test]
1194 fn test_deterministic_select_loop() {
1195 let executor = deterministic::Runner::default();
1196 test_select_loop(executor);
1197 }
1198
1199 #[test]
1200 fn test_deterministic_storage_operations() {
1201 let executor = deterministic::Runner::default();
1202 test_storage_operations(executor);
1203 }
1204
1205 #[test]
1206 fn test_deterministic_blob_read_write() {
1207 let executor = deterministic::Runner::default();
1208 test_blob_read_write(executor);
1209 }
1210
1211 #[test]
1212 fn test_deterministic_blob_resize() {
1213 let executor = deterministic::Runner::default();
1214 test_blob_resize(executor);
1215 }
1216
1217 #[test]
1218 fn test_deterministic_many_partition_read_write() {
1219 let executor = deterministic::Runner::default();
1220 test_many_partition_read_write(executor);
1221 }
1222
1223 #[test]
1224 fn test_deterministic_blob_read_past_length() {
1225 let executor = deterministic::Runner::default();
1226 test_blob_read_past_length(executor);
1227 }
1228
1229 #[test]
1230 fn test_deterministic_blob_clone_and_concurrent_read() {
1231 let executor = deterministic::Runner::default();
1233 test_blob_clone_and_concurrent_read(executor);
1234 }
1235
1236 #[test]
1237 fn test_deterministic_shutdown() {
1238 let executor = deterministic::Runner::default();
1239 test_shutdown(executor);
1240 }
1241
1242 #[test]
1243 fn test_deterministic_spawn_ref() {
1244 let executor = deterministic::Runner::default();
1245 test_spawn_ref(executor);
1246 }
1247
1248 #[test]
1249 #[should_panic]
1250 fn test_deterministic_spawn_ref_duplicate() {
1251 let executor = deterministic::Runner::default();
1252 test_spawn_ref_duplicate(executor);
1253 }
1254
1255 #[test]
1256 #[should_panic]
1257 fn test_deterministic_spawn_duplicate() {
1258 let executor = deterministic::Runner::default();
1259 test_spawn_duplicate(executor);
1260 }
1261
1262 #[test]
1263 fn test_deterministic_spawn_blocking() {
1264 for dedicated in [false, true] {
1265 let executor = deterministic::Runner::default();
1266 test_spawn_blocking(executor, dedicated);
1267 }
1268 }
1269
1270 #[test]
1271 #[should_panic(expected = "blocking task panicked")]
1272 fn test_deterministic_spawn_blocking_panic() {
1273 for dedicated in [false, true] {
1274 let executor = deterministic::Runner::default();
1275 executor.start(|context| async move {
1276 let handle = context.spawn_blocking(dedicated, |_| {
1277 panic!("blocking task panicked");
1278 });
1279 handle.await.unwrap();
1280 });
1281 }
1282 }
1283
1284 #[test]
1285 fn test_deterministic_spawn_blocking_abort() {
1286 for dedicated in [false, true] {
1287 let executor = deterministic::Runner::default();
1288 test_spawn_blocking_abort(executor, dedicated);
1289 }
1290 }
1291
1292 #[test]
1293 fn test_deterministic_spawn_blocking_ref() {
1294 for dedicated in [false, true] {
1295 let executor = deterministic::Runner::default();
1296 test_spawn_blocking_ref(executor, dedicated);
1297 }
1298 }
1299
1300 #[test]
1301 #[should_panic]
1302 fn test_deterministic_spawn_blocking_ref_duplicate() {
1303 for dedicated in [false, true] {
1304 let executor = deterministic::Runner::default();
1305 test_spawn_blocking_ref_duplicate(executor, dedicated);
1306 }
1307 }
1308
1309 #[test]
1310 fn test_deterministic_metrics() {
1311 let executor = deterministic::Runner::default();
1312 test_metrics(executor);
1313 }
1314
1315 #[test]
1316 #[should_panic]
1317 fn test_deterministic_metrics_label() {
1318 let executor = deterministic::Runner::default();
1319 test_metrics_label(executor);
1320 }
1321
1322 #[test]
1323 fn test_tokio_error_future() {
1324 let runner = tokio::Runner::default();
1325 test_error_future(runner);
1326 }
1327
1328 #[test]
1329 fn test_tokio_clock_sleep() {
1330 let executor = tokio::Runner::default();
1331 test_clock_sleep(executor);
1332 }
1333
1334 #[test]
1335 fn test_tokio_clock_sleep_until() {
1336 let executor = tokio::Runner::default();
1337 test_clock_sleep_until(executor);
1338 }
1339
1340 #[test]
1341 fn test_tokio_root_finishes() {
1342 let executor = tokio::Runner::default();
1343 test_root_finishes(executor);
1344 }
1345
1346 #[test]
1347 fn test_tokio_spawn_abort() {
1348 let executor = tokio::Runner::default();
1349 test_spawn_abort(executor);
1350 }
1351
1352 #[test]
1353 fn test_tokio_panic_aborts_root() {
1354 let executor = tokio::Runner::default();
1355 test_panic_aborts_root(executor);
1356 }
1357
1358 #[test]
1359 fn test_tokio_panic_aborts_spawn() {
1360 let executor = tokio::Runner::default();
1361 test_panic_aborts_spawn(executor);
1362 }
1363
1364 #[test]
1365 fn test_tokio_select() {
1366 let executor = tokio::Runner::default();
1367 test_select(executor);
1368 }
1369
1370 #[test]
1371 fn test_tokio_select_loop() {
1372 let executor = tokio::Runner::default();
1373 test_select_loop(executor);
1374 }
1375
1376 #[test]
1377 fn test_tokio_storage_operations() {
1378 let executor = tokio::Runner::default();
1379 test_storage_operations(executor);
1380 }
1381
1382 #[test]
1383 fn test_tokio_blob_read_write() {
1384 let executor = tokio::Runner::default();
1385 test_blob_read_write(executor);
1386 }
1387
1388 #[test]
1389 fn test_tokio_blob_resize() {
1390 let executor = tokio::Runner::default();
1391 test_blob_resize(executor);
1392 }
1393
1394 #[test]
1395 fn test_tokio_many_partition_read_write() {
1396 let executor = tokio::Runner::default();
1397 test_many_partition_read_write(executor);
1398 }
1399
1400 #[test]
1401 fn test_tokio_blob_read_past_length() {
1402 let executor = tokio::Runner::default();
1403 test_blob_read_past_length(executor);
1404 }
1405
1406 #[test]
1407 fn test_tokio_blob_clone_and_concurrent_read() {
1408 let executor = tokio::Runner::default();
1410 test_blob_clone_and_concurrent_read(executor);
1411 }
1412
1413 #[test]
1414 fn test_tokio_shutdown() {
1415 let executor = tokio::Runner::default();
1416 test_shutdown(executor);
1417 }
1418
1419 #[test]
1420 fn test_tokio_spawn_ref() {
1421 let executor = tokio::Runner::default();
1422 test_spawn_ref(executor);
1423 }
1424
1425 #[test]
1426 #[should_panic]
1427 fn test_tokio_spawn_ref_duplicate() {
1428 let executor = tokio::Runner::default();
1429 test_spawn_ref_duplicate(executor);
1430 }
1431
1432 #[test]
1433 #[should_panic]
1434 fn test_tokio_spawn_duplicate() {
1435 let executor = tokio::Runner::default();
1436 test_spawn_duplicate(executor);
1437 }
1438
1439 #[test]
1440 fn test_tokio_spawn_blocking() {
1441 for dedicated in [false, true] {
1442 let executor = tokio::Runner::default();
1443 test_spawn_blocking(executor, dedicated);
1444 }
1445 }
1446
1447 #[test]
1448 fn test_tokio_spawn_blocking_panic() {
1449 for dedicated in [false, true] {
1450 let executor = tokio::Runner::default();
1451 executor.start(|context| async move {
1452 let handle = context.spawn_blocking(dedicated, |_| {
1453 panic!("blocking task panicked");
1454 });
1455 let result = handle.await;
1456 assert!(matches!(result, Err(Error::Exited)));
1457 });
1458 }
1459 }
1460
1461 #[test]
1462 fn test_tokio_spawn_blocking_abort() {
1463 for dedicated in [false, true] {
1464 let executor = tokio::Runner::default();
1465 test_spawn_blocking_abort(executor, dedicated);
1466 }
1467 }
1468
1469 #[test]
1470 fn test_tokio_spawn_blocking_ref() {
1471 for dedicated in [false, true] {
1472 let executor = tokio::Runner::default();
1473 test_spawn_blocking_ref(executor, dedicated);
1474 }
1475 }
1476
1477 #[test]
1478 #[should_panic]
1479 fn test_tokio_spawn_blocking_ref_duplicate() {
1480 for dedicated in [false, true] {
1481 let executor = tokio::Runner::default();
1482 test_spawn_blocking_ref_duplicate(executor, dedicated);
1483 }
1484 }
1485
1486 #[test]
1487 fn test_tokio_metrics() {
1488 let executor = tokio::Runner::default();
1489 test_metrics(executor);
1490 }
1491
1492 #[test]
1493 #[should_panic]
1494 fn test_tokio_metrics_label() {
1495 let executor = tokio::Runner::default();
1496 test_metrics_label(executor);
1497 }
1498
1499 #[test]
1500 fn test_tokio_telemetry() {
1501 let executor = tokio::Runner::default();
1502 executor.start(|context| async move {
1503 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1505
1506 tokio::telemetry::init(
1508 context.with_label("metrics"),
1509 tokio::telemetry::Logging {
1510 level: Level::INFO,
1511 json: false,
1512 },
1513 Some(address),
1514 None,
1515 );
1516
1517 let counter: Counter<u64> = Counter::default();
1519 context.register("test_counter", "Test counter", counter.clone());
1520 counter.inc();
1521
1522 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1524 let mut line = Vec::new();
1525 loop {
1526 let byte = stream.recv(vec![0; 1]).await?;
1527 if byte[0] == b'\n' {
1528 if line.last() == Some(&b'\r') {
1529 line.pop(); }
1531 break;
1532 }
1533 line.push(byte[0]);
1534 }
1535 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1536 }
1537
1538 async fn read_headers<St: Stream>(
1539 stream: &mut St,
1540 ) -> Result<HashMap<String, String>, Error> {
1541 let mut headers = HashMap::new();
1542 loop {
1543 let line = read_line(stream).await?;
1544 if line.is_empty() {
1545 break;
1546 }
1547 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1548 if parts.len() == 2 {
1549 headers.insert(parts[0].to_string(), parts[1].to_string());
1550 }
1551 }
1552 Ok(headers)
1553 }
1554
1555 async fn read_body<St: Stream>(
1556 stream: &mut St,
1557 content_length: usize,
1558 ) -> Result<String, Error> {
1559 let read = stream.recv(vec![0; content_length]).await?;
1560 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1561 }
1562
1563 let client_handle = context
1565 .with_label("client")
1566 .spawn(move |context| async move {
1567 let (mut sink, mut stream) = loop {
1568 match context.dial(address).await {
1569 Ok((sink, stream)) => break (sink, stream),
1570 Err(e) => {
1571 error!(err =?e, "failed to connect");
1573 context.sleep(Duration::from_millis(10)).await;
1574 }
1575 }
1576 };
1577
1578 let request = format!(
1580 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1581 );
1582 sink.send(Bytes::from(request).to_vec()).await.unwrap();
1583
1584 let status_line = read_line(&mut stream).await.unwrap();
1586 assert_eq!(status_line, "HTTP/1.1 200 OK");
1587
1588 let headers = read_headers(&mut stream).await.unwrap();
1590 println!("Headers: {headers:?}");
1591 let content_length = headers
1592 .get("content-length")
1593 .unwrap()
1594 .parse::<usize>()
1595 .unwrap();
1596
1597 let body = read_body(&mut stream, content_length).await.unwrap();
1599 assert!(body.contains("test_counter_total 1"));
1600 });
1601
1602 client_handle.await.unwrap();
1604 });
1605 }
1606}