1use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23 future::Future,
24 io::Error as IoError,
25 net::SocketAddr,
26 time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36 if #[cfg(not(target_arch = "wasm32"))] {
37 pub mod tokio;
38 pub mod benchmarks;
39 }
40}
41mod network;
42mod process;
43mod storage;
44pub mod telemetry;
45mod utils;
46pub use utils::*;
47#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
48mod iouring;
49
50const METRICS_PREFIX: &str = "runtime";
52
53#[derive(Error, Debug)]
55pub enum Error {
56 #[error("exited")]
57 Exited,
58 #[error("closed")]
59 Closed,
60 #[error("timeout")]
61 Timeout,
62 #[error("bind failed")]
63 BindFailed,
64 #[error("connection failed")]
65 ConnectionFailed,
66 #[error("write failed")]
67 WriteFailed,
68 #[error("read failed")]
69 ReadFailed,
70 #[error("send failed")]
71 SendFailed,
72 #[error("recv failed")]
73 RecvFailed,
74 #[error("partition creation failed: {0}")]
75 PartitionCreationFailed(String),
76 #[error("partition missing: {0}")]
77 PartitionMissing(String),
78 #[error("partition corrupt: {0}")]
79 PartitionCorrupt(String),
80 #[error("blob open failed: {0}/{1} error: {2}")]
81 BlobOpenFailed(String, String, IoError),
82 #[error("blob missing: {0}/{1}")]
83 BlobMissing(String, String),
84 #[error("blob resize failed: {0}/{1} error: {2}")]
85 BlobResizeFailed(String, String, IoError),
86 #[error("blob sync failed: {0}/{1} error: {2}")]
87 BlobSyncFailed(String, String, IoError),
88 #[error("blob insufficient length")]
89 BlobInsufficientLength,
90 #[error("offset overflow")]
91 OffsetOverflow,
92 #[error("io error: {0}")]
93 Io(#[from] IoError),
94}
95
96pub trait Runner {
99 type Context;
105
106 fn start<F, Fut>(self, f: F) -> Fut::Output
108 where
109 F: FnOnce(Self::Context) -> Fut,
110 Fut: Future;
111}
112
113pub trait Spawner: Clone + Send + Sync + 'static {
115 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
123 where
124 F: FnOnce(Self) -> Fut + Send + 'static,
125 Fut: Future<Output = T> + Send + 'static,
126 T: Send + 'static;
127
128 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
137 where
138 F: Future<Output = T> + Send + 'static,
139 T: Send + 'static;
140
141 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
160 where
161 F: FnOnce(Self) -> T + Send + 'static,
162 T: Send + 'static;
163
164 fn spawn_blocking_ref<F, T>(
173 &mut self,
174 dedicated: bool,
175 ) -> impl FnOnce(F) -> Handle<T> + 'static
176 where
177 F: FnOnce() -> T + Send + 'static,
178 T: Send + 'static;
179
180 fn stop(
200 self,
201 value: i32,
202 timeout: Option<Duration>,
203 ) -> impl Future<Output = Result<(), Error>> + Send;
204
205 fn stopped(&self) -> signal::Signal;
212}
213
214pub trait Metrics: Clone + Send + Sync + 'static {
216 fn label(&self) -> String;
218
219 fn with_label(&self, label: &str) -> Self;
227
228 fn scoped_label(&self, label: &str) -> String {
232 let label = if self.label().is_empty() {
233 label.to_string()
234 } else {
235 format!("{}_{}", self.label(), label)
236 };
237 assert!(
238 !label.starts_with(METRICS_PREFIX),
239 "using runtime label is not allowed"
240 );
241 label
242 }
243
244 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
248
249 fn encode(&self) -> String;
251}
252
253pub trait Clock: Clone + Send + Sync + 'static {
259 fn current(&self) -> SystemTime;
261
262 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
264
265 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
267}
268
269pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
271
272pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
274
275pub type ListenerOf<N> = <N as crate::Network>::Listener;
277
278pub trait Network: Clone + Send + Sync + 'static {
281 type Listener: Listener;
285
286 fn bind(
288 &self,
289 socket: SocketAddr,
290 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
291
292 fn dial(
294 &self,
295 socket: SocketAddr,
296 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
297}
298
299pub trait Listener: Sync + Send + 'static {
302 type Sink: Sink;
305 type Stream: Stream;
308
309 fn accept(
311 &mut self,
312 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
313
314 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
316}
317
318pub trait Sink: Sync + Send + 'static {
321 fn send(
323 &mut self,
324 msg: impl Into<StableBuf> + Send,
325 ) -> impl Future<Output = Result<(), Error>> + Send;
326}
327
328pub trait Stream: Sync + Send + 'static {
331 fn recv(
334 &mut self,
335 buf: impl Into<StableBuf> + Send,
336 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
337}
338
339pub trait Storage: Clone + Send + Sync + 'static {
347 type Blob: Blob;
349
350 fn open(
356 &self,
357 partition: &str,
358 name: &[u8],
359 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
360
361 fn remove(
365 &self,
366 partition: &str,
367 name: Option<&[u8]>,
368 ) -> impl Future<Output = Result<(), Error>> + Send;
369
370 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
372}
373
374#[allow(clippy::len_without_is_empty)]
389pub trait Blob: Clone + Send + Sync + 'static {
390 fn read_at(
395 &self,
396 buf: impl Into<StableBuf> + Send,
397 offset: u64,
398 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
399
400 fn write_at(
402 &self,
403 buf: impl Into<StableBuf> + Send,
404 offset: u64,
405 ) -> impl Future<Output = Result<(), Error>> + Send;
406
407 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
412
413 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use bytes::Bytes;
421 use commonware_macros::select;
422 use futures::{
423 channel::{mpsc, oneshot},
424 future::{pending, ready},
425 join, pin_mut, FutureExt, SinkExt, StreamExt,
426 };
427 use prometheus_client::metrics::counter::Counter;
428 use std::{
429 collections::HashMap,
430 panic::{catch_unwind, AssertUnwindSafe},
431 str::FromStr,
432 sync::{
433 atomic::{AtomicU32, Ordering},
434 Arc, Mutex,
435 },
436 };
437 use tracing::{error, Level};
438 use utils::reschedule;
439
440 fn test_error_future<R: Runner>(runner: R) {
441 async fn error_future() -> Result<&'static str, &'static str> {
442 Err("An error occurred")
443 }
444 let result = runner.start(|_| error_future());
445 assert_eq!(result, Err("An error occurred"));
446 }
447
448 fn test_clock_sleep<R: Runner>(runner: R)
449 where
450 R::Context: Spawner + Clock,
451 {
452 runner.start(|context| async move {
453 let start = context.current();
455 let sleep_duration = Duration::from_millis(10);
456 context.sleep(sleep_duration).await;
457
458 let end = context.current();
460 assert!(end.duration_since(start).unwrap() >= sleep_duration);
461 });
462 }
463
464 fn test_clock_sleep_until<R: Runner>(runner: R)
465 where
466 R::Context: Spawner + Clock,
467 {
468 runner.start(|context| async move {
469 let now = context.current();
471 context.sleep_until(now + Duration::from_millis(100)).await;
472
473 let elapsed = now.elapsed().unwrap();
475 assert!(elapsed >= Duration::from_millis(100));
476 });
477 }
478
479 fn test_root_finishes<R: Runner>(runner: R)
480 where
481 R::Context: Spawner,
482 {
483 runner.start(|context| async move {
484 context.spawn(|_| async move {
485 loop {
486 reschedule().await;
487 }
488 });
489 });
490 }
491
492 fn test_spawn_abort<R: Runner>(runner: R)
493 where
494 R::Context: Spawner,
495 {
496 runner.start(|context| async move {
497 let handle = context.spawn(|_| async move {
498 loop {
499 reschedule().await;
500 }
501 });
502 handle.abort();
503 assert!(matches!(handle.await, Err(Error::Closed)));
504 });
505 }
506
507 fn test_panic_aborts_root<R: Runner>(runner: R) {
508 let result = catch_unwind(AssertUnwindSafe(|| {
509 runner.start(|_| async move {
510 panic!("blah");
511 });
512 }));
513 result.unwrap_err();
514 }
515
516 fn test_panic_aborts_spawn<R: Runner>(runner: R)
517 where
518 R::Context: Spawner,
519 {
520 let result = runner.start(|context| async move {
521 let result = context.spawn(|_| async move {
522 panic!("blah");
523 });
524 assert!(matches!(result.await, Err(Error::Exited)));
525 Result::<(), Error>::Ok(())
526 });
527
528 result.unwrap();
530 }
531
532 fn test_select<R: Runner>(runner: R) {
533 runner.start(|_| async move {
534 let output = Mutex::new(0);
536 select! {
537 v1 = ready(1) => {
538 *output.lock().unwrap() = v1;
539 },
540 v2 = ready(2) => {
541 *output.lock().unwrap() = v2;
542 },
543 };
544 assert_eq!(*output.lock().unwrap(), 1);
545
546 select! {
548 v1 = std::future::pending::<i32>() => {
549 *output.lock().unwrap() = v1;
550 },
551 v2 = ready(2) => {
552 *output.lock().unwrap() = v2;
553 },
554 };
555 assert_eq!(*output.lock().unwrap(), 2);
556 });
557 }
558
559 fn test_select_loop<R: Runner>(runner: R)
561 where
562 R::Context: Clock,
563 {
564 runner.start(|context| async move {
565 let (mut sender, mut receiver) = mpsc::unbounded();
567 for _ in 0..2 {
568 select! {
569 v = receiver.next() => {
570 panic!("unexpected value: {v:?}");
571 },
572 _ = context.sleep(Duration::from_millis(100)) => {
573 continue;
574 },
575 };
576 }
577
578 sender.send(0).await.unwrap();
580 sender.send(1).await.unwrap();
581
582 select! {
584 _ = async {} => {
585 },
587 v = receiver.next() => {
588 panic!("unexpected value: {v:?}");
589 },
590 };
591
592 for i in 0..2 {
594 select! {
595 _ = context.sleep(Duration::from_millis(100)) => {
596 panic!("timeout");
597 },
598 v = receiver.next() => {
599 assert_eq!(v.unwrap(), i);
600 },
601 };
602 }
603 });
604 }
605
606 fn test_storage_operations<R: Runner>(runner: R)
607 where
608 R::Context: Storage,
609 {
610 runner.start(|context| async move {
611 let partition = "test_partition";
612 let name = b"test_blob";
613
614 let (blob, _) = context
616 .open(partition, name)
617 .await
618 .expect("Failed to open blob");
619
620 let data = b"Hello, Storage!";
622 blob.write_at(Vec::from(data), 0)
623 .await
624 .expect("Failed to write to blob");
625
626 blob.sync().await.expect("Failed to sync blob");
628
629 let read = blob
631 .read_at(vec![0; data.len()], 0)
632 .await
633 .expect("Failed to read from blob");
634 assert_eq!(read.as_ref(), data);
635
636 blob.sync().await.expect("Failed to sync blob");
638
639 let blobs = context
641 .scan(partition)
642 .await
643 .expect("Failed to scan partition");
644 assert!(blobs.contains(&name.to_vec()));
645
646 let (blob, len) = context
648 .open(partition, name)
649 .await
650 .expect("Failed to reopen blob");
651 assert_eq!(len, data.len() as u64);
652
653 let read = blob
655 .read_at(vec![0u8; 7], 7)
656 .await
657 .expect("Failed to read data");
658 assert_eq!(read.as_ref(), b"Storage");
659
660 blob.sync().await.expect("Failed to sync blob");
662
663 context
665 .remove(partition, Some(name))
666 .await
667 .expect("Failed to remove blob");
668
669 let blobs = context
671 .scan(partition)
672 .await
673 .expect("Failed to scan partition");
674 assert!(!blobs.contains(&name.to_vec()));
675
676 context
678 .remove(partition, None)
679 .await
680 .expect("Failed to remove partition");
681
682 let result = context.scan(partition).await;
684 assert!(matches!(result, Err(Error::PartitionMissing(_))));
685 });
686 }
687
688 fn test_blob_read_write<R: Runner>(runner: R)
689 where
690 R::Context: Storage,
691 {
692 runner.start(|context| async move {
693 let partition = "test_partition";
694 let name = b"test_blob_rw";
695
696 let (blob, _) = context
698 .open(partition, name)
699 .await
700 .expect("Failed to open blob");
701
702 let data1 = b"Hello";
704 let data2 = b"World";
705 blob.write_at(Vec::from(data1), 0)
706 .await
707 .expect("Failed to write data1");
708 blob.write_at(Vec::from(data2), 5)
709 .await
710 .expect("Failed to write data2");
711
712 let read = blob
714 .read_at(vec![0u8; 10], 0)
715 .await
716 .expect("Failed to read data");
717 assert_eq!(&read.as_ref()[..5], data1);
718 assert_eq!(&read.as_ref()[5..], data2);
719
720 let result = blob.read_at(vec![0u8; 10], 10).await;
722 assert!(result.is_err());
723
724 let data3 = b"Store";
726 blob.write_at(Vec::from(data3), 5)
727 .await
728 .expect("Failed to write data3");
729
730 let read = blob
732 .read_at(vec![0u8; 10], 0)
733 .await
734 .expect("Failed to read data");
735 assert_eq!(&read.as_ref()[..5], data1);
736 assert_eq!(&read.as_ref()[5..], data3);
737
738 let result = blob.read_at(vec![0u8; 10], 10).await;
740 assert!(result.is_err());
741 });
742 }
743
744 fn test_blob_resize<R: Runner>(runner: R)
745 where
746 R::Context: Storage,
747 {
748 runner.start(|context| async move {
749 let partition = "test_partition_resize";
750 let name = b"test_blob_resize";
751
752 let (blob, _) = context
754 .open(partition, name)
755 .await
756 .expect("Failed to open blob");
757
758 let data = b"some data";
759 blob.write_at(data.to_vec(), 0)
760 .await
761 .expect("Failed to write");
762 blob.sync().await.expect("Failed to sync after write");
763
764 let (blob, len) = context.open(partition, name).await.unwrap();
766 assert_eq!(len, data.len() as u64);
767
768 let new_len = (data.len() as u64) * 2;
770 blob.resize(new_len)
771 .await
772 .expect("Failed to resize to extend");
773 blob.sync().await.expect("Failed to sync after resize");
774
775 let (blob, len) = context.open(partition, name).await.unwrap();
777 assert_eq!(len, new_len);
778
779 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
781 assert_eq!(read_buf.as_ref(), data);
782
783 let extended_part = blob
785 .read_at(vec![0; data.len()], data.len() as u64)
786 .await
787 .unwrap();
788 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
789
790 blob.resize(data.len() as u64).await.unwrap();
792 blob.sync().await.unwrap();
793
794 let (blob, size) = context.open(partition, name).await.unwrap();
796 assert_eq!(size, data.len() as u64);
797
798 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
800 assert_eq!(read_buf.as_ref(), data);
801 blob.sync().await.unwrap();
802 });
803 }
804
805 fn test_many_partition_read_write<R: Runner>(runner: R)
806 where
807 R::Context: Storage,
808 {
809 runner.start(|context| async move {
810 let partitions = ["partition1", "partition2", "partition3"];
811 let name = b"test_blob_rw";
812 let data1 = b"Hello";
813 let data2 = b"World";
814
815 for (additional, partition) in partitions.iter().enumerate() {
816 let (blob, _) = context
818 .open(partition, name)
819 .await
820 .expect("Failed to open blob");
821
822 blob.write_at(Vec::from(data1), 0)
824 .await
825 .expect("Failed to write data1");
826 blob.write_at(Vec::from(data2), 5 + additional as u64)
827 .await
828 .expect("Failed to write data2");
829
830 blob.sync().await.expect("Failed to sync blob");
832 }
833
834 for (additional, partition) in partitions.iter().enumerate() {
835 let (blob, len) = context
837 .open(partition, name)
838 .await
839 .expect("Failed to open blob");
840 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
841
842 let read = blob
844 .read_at(vec![0u8; 10 + additional], 0)
845 .await
846 .expect("Failed to read data");
847 assert_eq!(&read.as_ref()[..5], b"Hello");
848 assert_eq!(&read.as_ref()[5 + additional..], b"World");
849 }
850 });
851 }
852
853 fn test_blob_read_past_length<R: Runner>(runner: R)
854 where
855 R::Context: Storage,
856 {
857 runner.start(|context| async move {
858 let partition = "test_partition";
859 let name = b"test_blob_rw";
860
861 let (blob, _) = context
863 .open(partition, name)
864 .await
865 .expect("Failed to open blob");
866
867 let result = blob.read_at(vec![0u8; 10], 0).await;
869 assert!(result.is_err());
870
871 let data = b"Hello, Storage!".to_vec();
873 blob.write_at(data, 0)
874 .await
875 .expect("Failed to write to blob");
876
877 let result = blob.read_at(vec![0u8; 20], 0).await;
879 assert!(result.is_err());
880 })
881 }
882
883 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
884 where
885 R::Context: Spawner + Storage + Metrics,
886 {
887 runner.start(|context| async move {
888 let partition = "test_partition";
889 let name = b"test_blob_rw";
890
891 let (blob, _) = context
893 .open(partition, name)
894 .await
895 .expect("Failed to open blob");
896
897 let data = b"Hello, Storage!";
899 blob.write_at(Vec::from(data), 0)
900 .await
901 .expect("Failed to write to blob");
902
903 blob.sync().await.expect("Failed to sync blob");
905
906 let check1 = context.with_label("check1").spawn({
908 let blob = blob.clone();
909 move |_| async move {
910 let read = blob
911 .read_at(vec![0u8; data.len()], 0)
912 .await
913 .expect("Failed to read from blob");
914 assert_eq!(read.as_ref(), data);
915 }
916 });
917 let check2 = context.with_label("check2").spawn({
918 let blob = blob.clone();
919 move |_| async move {
920 let read = blob
921 .read_at(vec![0; data.len()], 0)
922 .await
923 .expect("Failed to read from blob");
924 assert_eq!(read.as_ref(), data);
925 }
926 });
927
928 let result = join!(check1, check2);
930 assert!(result.0.is_ok());
931 assert!(result.1.is_ok());
932
933 let read = blob
935 .read_at(vec![0; data.len()], 0)
936 .await
937 .expect("Failed to read from blob");
938 assert_eq!(read.as_ref(), data);
939
940 drop(blob);
942
943 let buffer = context.encode();
945 assert!(buffer.contains("open_blobs 0"));
946 });
947 }
948
949 fn test_shutdown<R: Runner>(runner: R)
950 where
951 R::Context: Spawner + Metrics + Clock,
952 {
953 let kill = 9;
954 runner.start(|context| async move {
955 let before = context
957 .with_label("before")
958 .spawn(move |context| async move {
959 let mut signal = context.stopped();
960 let value = (&mut signal).await.unwrap();
961 assert_eq!(value, kill);
962 drop(signal);
963 });
964
965 let result = context.clone().stop(kill, None).await;
967 assert!(result.is_ok());
968
969 let after = context
971 .with_label("after")
972 .spawn(move |context| async move {
973 let value = context.stopped().await.unwrap();
975 assert_eq!(value, kill);
976 });
977
978 let result = join!(before, after);
980 assert!(result.0.is_ok());
981 assert!(result.1.is_ok());
982 });
983 }
984
985 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
986 where
987 R::Context: Spawner + Metrics + Clock,
988 {
989 let kill = 42;
990 runner.start(|context| async move {
991 let (started_tx, mut started_rx) = mpsc::channel(3);
992 let counter = Arc::new(AtomicU32::new(0));
993
994 let task = |cleanup_duration: Duration| {
997 let context = context.clone();
998 let counter = counter.clone();
999 let mut started_tx = started_tx.clone();
1000 context.spawn(move |context| async move {
1001 let mut signal = context.stopped();
1003 started_tx.send(()).await.unwrap();
1004
1005 let value = (&mut signal).await.unwrap();
1007 assert_eq!(value, kill);
1008 context.sleep(cleanup_duration).await;
1009 counter.fetch_add(1, Ordering::SeqCst);
1010
1011 drop(signal);
1013 })
1014 };
1015
1016 let task1 = task(Duration::from_millis(10));
1017 let task2 = task(Duration::from_millis(20));
1018 let task3 = task(Duration::from_millis(30));
1019
1020 for _ in 0..3 {
1022 started_rx.next().await.unwrap();
1023 }
1024
1025 context.stop(kill, None).await.unwrap();
1027 assert_eq!(counter.load(Ordering::SeqCst), 3);
1028
1029 let result = join!(task1, task2, task3);
1031 assert!(result.0.is_ok());
1032 assert!(result.1.is_ok());
1033 assert!(result.2.is_ok());
1034 });
1035 }
1036
1037 fn test_shutdown_timeout<R: Runner>(runner: R)
1038 where
1039 R::Context: Spawner + Metrics + Clock,
1040 {
1041 let kill = 42;
1042 runner.start(|context| async move {
1043 let (started_tx, started_rx) = oneshot::channel();
1045
1046 context.clone().spawn(move |context| async move {
1048 let signal = context.stopped();
1049 started_tx.send(()).unwrap();
1050 pending::<()>().await;
1051 signal.await.unwrap();
1052 });
1053
1054 started_rx.await.unwrap();
1056 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1057
1058 assert!(matches!(result, Err(Error::Timeout)));
1060 });
1061 }
1062
1063 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1064 where
1065 R::Context: Spawner + Metrics + Clock,
1066 {
1067 let kill1 = 42;
1068 let kill2 = 43;
1069
1070 runner.start(|context| async move {
1071 let (started_tx, started_rx) = oneshot::channel();
1072 let counter = Arc::new(AtomicU32::new(0));
1073
1074 let task = context.with_label("blocking_task").spawn({
1076 let counter = counter.clone();
1077 move |context| async move {
1078 let mut signal = context.stopped();
1080 started_tx.send(()).unwrap();
1081
1082 let value = (&mut signal).await.unwrap();
1084 assert_eq!(value, kill1);
1085 context.sleep(Duration::from_millis(50)).await;
1086
1087 counter.fetch_add(1, Ordering::SeqCst);
1089 drop(signal);
1090 }
1091 });
1092
1093 started_rx.await.unwrap();
1095
1096 let stop_task1 = context.clone().stop(kill1, None);
1099 pin_mut!(stop_task1);
1100 let stop_task2 = context.clone().stop(kill2, None);
1101 pin_mut!(stop_task2);
1102
1103 assert!(stop_task1.as_mut().now_or_never().is_none());
1105 assert!(stop_task2.as_mut().now_or_never().is_none());
1106
1107 assert!(stop_task1.await.is_ok());
1109 assert!(stop_task2.await.is_ok());
1110
1111 let sig = context.stopped().await;
1113 assert_eq!(sig.unwrap(), kill1);
1114
1115 let result = task.await;
1117 assert!(result.is_ok());
1118 assert_eq!(counter.load(Ordering::SeqCst), 1);
1119
1120 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1122 });
1123 }
1124
1125 fn test_spawn_ref<R: Runner>(runner: R)
1126 where
1127 R::Context: Spawner,
1128 {
1129 runner.start(|mut context| async move {
1130 let handle = context.spawn_ref();
1131 let result = handle(async move { 42 }).await;
1132 assert!(matches!(result, Ok(42)));
1133 });
1134 }
1135
1136 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1137 where
1138 R::Context: Spawner,
1139 {
1140 runner.start(|mut context| async move {
1141 let handle = context.spawn_ref();
1142 let result = handle(async move { 42 }).await;
1143 assert!(matches!(result, Ok(42)));
1144
1145 let handle = context.spawn_ref();
1147 let result = handle(async move { 42 }).await;
1148 assert!(matches!(result, Ok(42)));
1149 });
1150 }
1151
1152 fn test_spawn_duplicate<R: Runner>(runner: R)
1153 where
1154 R::Context: Spawner,
1155 {
1156 runner.start(|mut context| async move {
1157 let handle = context.spawn_ref();
1158 let result = handle(async move { 42 }).await;
1159 assert!(matches!(result, Ok(42)));
1160
1161 context.spawn(|_| async move { 42 });
1163 });
1164 }
1165
1166 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1167 where
1168 R::Context: Spawner,
1169 {
1170 runner.start(|context| async move {
1171 let handle = context.spawn_blocking(dedicated, |_| 42);
1172 let result = handle.await;
1173 assert!(matches!(result, Ok(42)));
1174 });
1175 }
1176
1177 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1178 where
1179 R::Context: Spawner,
1180 {
1181 runner.start(|mut context| async move {
1182 let spawn = context.spawn_blocking_ref(dedicated);
1183 let handle = spawn(|| 42);
1184 let result = handle.await;
1185 assert!(matches!(result, Ok(42)));
1186 });
1187 }
1188
1189 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1190 where
1191 R::Context: Spawner,
1192 {
1193 runner.start(|mut context| async move {
1194 let spawn = context.spawn_blocking_ref(dedicated);
1195 let result = spawn(|| 42).await;
1196 assert!(matches!(result, Ok(42)));
1197
1198 context.spawn_blocking(dedicated, |_| 42);
1200 });
1201 }
1202
1203 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1204 where
1205 R::Context: Spawner,
1206 {
1207 runner.start(|context| async move {
1208 let (sender, mut receiver) = oneshot::channel();
1210 let handle = context.spawn_blocking(dedicated, move |_| {
1211 loop {
1213 if receiver.try_recv().is_ok() {
1214 break;
1215 }
1216 }
1217
1218 let mut count = 0;
1220 loop {
1221 count += 1;
1222 if count >= 100_000_000 {
1223 break;
1224 }
1225 }
1226 count
1227 });
1228
1229 handle.abort();
1235 sender.send(()).unwrap();
1236
1237 assert!(matches!(handle.await, Ok(100_000_000)));
1239 });
1240 }
1241
1242 fn test_metrics<R: Runner>(runner: R)
1243 where
1244 R::Context: Metrics,
1245 {
1246 runner.start(|context| async move {
1247 assert_eq!(context.label(), "");
1249
1250 let counter = Counter::<u64>::default();
1252 context.register("test", "test", counter.clone());
1253
1254 counter.inc();
1256
1257 let buffer = context.encode();
1259 assert!(buffer.contains("test_total 1"));
1260
1261 let context = context.with_label("nested");
1263 let nested_counter = Counter::<u64>::default();
1264 context.register("test", "test", nested_counter.clone());
1265
1266 nested_counter.inc();
1268
1269 let buffer = context.encode();
1271 assert!(buffer.contains("nested_test_total 1"));
1272 assert!(buffer.contains("test_total 1"));
1273 });
1274 }
1275
1276 fn test_metrics_label<R: Runner>(runner: R)
1277 where
1278 R::Context: Metrics,
1279 {
1280 runner.start(|context| async move {
1281 context.with_label(METRICS_PREFIX);
1282 })
1283 }
1284
1285 #[test]
1286 fn test_deterministic_future() {
1287 let runner = deterministic::Runner::default();
1288 test_error_future(runner);
1289 }
1290
1291 #[test]
1292 fn test_deterministic_clock_sleep() {
1293 let executor = deterministic::Runner::default();
1294 test_clock_sleep(executor);
1295 }
1296
1297 #[test]
1298 fn test_deterministic_clock_sleep_until() {
1299 let executor = deterministic::Runner::default();
1300 test_clock_sleep_until(executor);
1301 }
1302
1303 #[test]
1304 fn test_deterministic_root_finishes() {
1305 let executor = deterministic::Runner::default();
1306 test_root_finishes(executor);
1307 }
1308
1309 #[test]
1310 fn test_deterministic_spawn_abort() {
1311 let executor = deterministic::Runner::default();
1312 test_spawn_abort(executor);
1313 }
1314
1315 #[test]
1316 fn test_deterministic_panic_aborts_root() {
1317 let runner = deterministic::Runner::default();
1318 test_panic_aborts_root(runner);
1319 }
1320
1321 #[test]
1322 #[should_panic(expected = "blah")]
1323 fn test_deterministic_panic_aborts_spawn() {
1324 let executor = deterministic::Runner::default();
1325 test_panic_aborts_spawn(executor);
1326 }
1327
1328 #[test]
1329 fn test_deterministic_select() {
1330 let executor = deterministic::Runner::default();
1331 test_select(executor);
1332 }
1333
1334 #[test]
1335 fn test_deterministic_select_loop() {
1336 let executor = deterministic::Runner::default();
1337 test_select_loop(executor);
1338 }
1339
1340 #[test]
1341 fn test_deterministic_storage_operations() {
1342 let executor = deterministic::Runner::default();
1343 test_storage_operations(executor);
1344 }
1345
1346 #[test]
1347 fn test_deterministic_blob_read_write() {
1348 let executor = deterministic::Runner::default();
1349 test_blob_read_write(executor);
1350 }
1351
1352 #[test]
1353 fn test_deterministic_blob_resize() {
1354 let executor = deterministic::Runner::default();
1355 test_blob_resize(executor);
1356 }
1357
1358 #[test]
1359 fn test_deterministic_many_partition_read_write() {
1360 let executor = deterministic::Runner::default();
1361 test_many_partition_read_write(executor);
1362 }
1363
1364 #[test]
1365 fn test_deterministic_blob_read_past_length() {
1366 let executor = deterministic::Runner::default();
1367 test_blob_read_past_length(executor);
1368 }
1369
1370 #[test]
1371 fn test_deterministic_blob_clone_and_concurrent_read() {
1372 let executor = deterministic::Runner::default();
1374 test_blob_clone_and_concurrent_read(executor);
1375 }
1376
1377 #[test]
1378 fn test_deterministic_shutdown() {
1379 let executor = deterministic::Runner::default();
1380 test_shutdown(executor);
1381 }
1382
1383 #[test]
1384 fn test_deterministic_shutdown_multiple_signals() {
1385 let executor = deterministic::Runner::default();
1386 test_shutdown_multiple_signals(executor);
1387 }
1388
1389 #[test]
1390 fn test_deterministic_shutdown_timeout() {
1391 let executor = deterministic::Runner::default();
1392 test_shutdown_timeout(executor);
1393 }
1394
1395 #[test]
1396 fn test_deterministic_shutdown_multiple_stop_calls() {
1397 let executor = deterministic::Runner::default();
1398 test_shutdown_multiple_stop_calls(executor);
1399 }
1400
1401 #[test]
1402 fn test_deterministic_spawn_ref() {
1403 let executor = deterministic::Runner::default();
1404 test_spawn_ref(executor);
1405 }
1406
1407 #[test]
1408 #[should_panic]
1409 fn test_deterministic_spawn_ref_duplicate() {
1410 let executor = deterministic::Runner::default();
1411 test_spawn_ref_duplicate(executor);
1412 }
1413
1414 #[test]
1415 #[should_panic]
1416 fn test_deterministic_spawn_duplicate() {
1417 let executor = deterministic::Runner::default();
1418 test_spawn_duplicate(executor);
1419 }
1420
1421 #[test]
1422 fn test_deterministic_spawn_blocking() {
1423 for dedicated in [false, true] {
1424 let executor = deterministic::Runner::default();
1425 test_spawn_blocking(executor, dedicated);
1426 }
1427 }
1428
1429 #[test]
1430 #[should_panic(expected = "blocking task panicked")]
1431 fn test_deterministic_spawn_blocking_panic() {
1432 for dedicated in [false, true] {
1433 let executor = deterministic::Runner::default();
1434 executor.start(|context| async move {
1435 let handle = context.spawn_blocking(dedicated, |_| {
1436 panic!("blocking task panicked");
1437 });
1438 handle.await.unwrap();
1439 });
1440 }
1441 }
1442
1443 #[test]
1444 fn test_deterministic_spawn_blocking_abort() {
1445 for dedicated in [false, true] {
1446 let executor = deterministic::Runner::default();
1447 test_spawn_blocking_abort(executor, dedicated);
1448 }
1449 }
1450
1451 #[test]
1452 fn test_deterministic_spawn_blocking_ref() {
1453 for dedicated in [false, true] {
1454 let executor = deterministic::Runner::default();
1455 test_spawn_blocking_ref(executor, dedicated);
1456 }
1457 }
1458
1459 #[test]
1460 #[should_panic]
1461 fn test_deterministic_spawn_blocking_ref_duplicate() {
1462 for dedicated in [false, true] {
1463 let executor = deterministic::Runner::default();
1464 test_spawn_blocking_ref_duplicate(executor, dedicated);
1465 }
1466 }
1467
1468 #[test]
1469 fn test_deterministic_metrics() {
1470 let executor = deterministic::Runner::default();
1471 test_metrics(executor);
1472 }
1473
1474 #[test]
1475 #[should_panic]
1476 fn test_deterministic_metrics_label() {
1477 let executor = deterministic::Runner::default();
1478 test_metrics_label(executor);
1479 }
1480
1481 #[test]
1482 fn test_tokio_error_future() {
1483 let runner = tokio::Runner::default();
1484 test_error_future(runner);
1485 }
1486
1487 #[test]
1488 fn test_tokio_clock_sleep() {
1489 let executor = tokio::Runner::default();
1490 test_clock_sleep(executor);
1491 }
1492
1493 #[test]
1494 fn test_tokio_clock_sleep_until() {
1495 let executor = tokio::Runner::default();
1496 test_clock_sleep_until(executor);
1497 }
1498
1499 #[test]
1500 fn test_tokio_root_finishes() {
1501 let executor = tokio::Runner::default();
1502 test_root_finishes(executor);
1503 }
1504
1505 #[test]
1506 fn test_tokio_spawn_abort() {
1507 let executor = tokio::Runner::default();
1508 test_spawn_abort(executor);
1509 }
1510
1511 #[test]
1512 fn test_tokio_panic_aborts_root() {
1513 let executor = tokio::Runner::default();
1514 test_panic_aborts_root(executor);
1515 }
1516
1517 #[test]
1518 fn test_tokio_panic_aborts_spawn() {
1519 let executor = tokio::Runner::default();
1520 test_panic_aborts_spawn(executor);
1521 }
1522
1523 #[test]
1524 fn test_tokio_select() {
1525 let executor = tokio::Runner::default();
1526 test_select(executor);
1527 }
1528
1529 #[test]
1530 fn test_tokio_select_loop() {
1531 let executor = tokio::Runner::default();
1532 test_select_loop(executor);
1533 }
1534
1535 #[test]
1536 fn test_tokio_storage_operations() {
1537 let executor = tokio::Runner::default();
1538 test_storage_operations(executor);
1539 }
1540
1541 #[test]
1542 fn test_tokio_blob_read_write() {
1543 let executor = tokio::Runner::default();
1544 test_blob_read_write(executor);
1545 }
1546
1547 #[test]
1548 fn test_tokio_blob_resize() {
1549 let executor = tokio::Runner::default();
1550 test_blob_resize(executor);
1551 }
1552
1553 #[test]
1554 fn test_tokio_many_partition_read_write() {
1555 let executor = tokio::Runner::default();
1556 test_many_partition_read_write(executor);
1557 }
1558
1559 #[test]
1560 fn test_tokio_blob_read_past_length() {
1561 let executor = tokio::Runner::default();
1562 test_blob_read_past_length(executor);
1563 }
1564
1565 #[test]
1566 fn test_tokio_blob_clone_and_concurrent_read() {
1567 let executor = tokio::Runner::default();
1569 test_blob_clone_and_concurrent_read(executor);
1570 }
1571
1572 #[test]
1573 fn test_tokio_shutdown() {
1574 let executor = tokio::Runner::default();
1575 test_shutdown(executor);
1576 }
1577
1578 #[test]
1579 fn test_tokio_shutdown_multiple_signals() {
1580 let executor = tokio::Runner::default();
1581 test_shutdown_multiple_signals(executor);
1582 }
1583
1584 #[test]
1585 fn test_tokio_shutdown_timeout() {
1586 let executor = tokio::Runner::default();
1587 test_shutdown_timeout(executor);
1588 }
1589
1590 #[test]
1591 fn test_tokio_shutdown_multiple_stop_calls() {
1592 let executor = tokio::Runner::default();
1593 test_shutdown_multiple_stop_calls(executor);
1594 }
1595
1596 #[test]
1597 fn test_tokio_spawn_ref() {
1598 let executor = tokio::Runner::default();
1599 test_spawn_ref(executor);
1600 }
1601
1602 #[test]
1603 #[should_panic]
1604 fn test_tokio_spawn_ref_duplicate() {
1605 let executor = tokio::Runner::default();
1606 test_spawn_ref_duplicate(executor);
1607 }
1608
1609 #[test]
1610 #[should_panic]
1611 fn test_tokio_spawn_duplicate() {
1612 let executor = tokio::Runner::default();
1613 test_spawn_duplicate(executor);
1614 }
1615
1616 #[test]
1617 fn test_tokio_spawn_blocking() {
1618 for dedicated in [false, true] {
1619 let executor = tokio::Runner::default();
1620 test_spawn_blocking(executor, dedicated);
1621 }
1622 }
1623
1624 #[test]
1625 fn test_tokio_spawn_blocking_panic() {
1626 for dedicated in [false, true] {
1627 let executor = tokio::Runner::default();
1628 executor.start(|context| async move {
1629 let handle = context.spawn_blocking(dedicated, |_| {
1630 panic!("blocking task panicked");
1631 });
1632 let result = handle.await;
1633 assert!(matches!(result, Err(Error::Exited)));
1634 });
1635 }
1636 }
1637
1638 #[test]
1639 fn test_tokio_spawn_blocking_abort() {
1640 for dedicated in [false, true] {
1641 let executor = tokio::Runner::default();
1642 test_spawn_blocking_abort(executor, dedicated);
1643 }
1644 }
1645
1646 #[test]
1647 fn test_tokio_spawn_blocking_ref() {
1648 for dedicated in [false, true] {
1649 let executor = tokio::Runner::default();
1650 test_spawn_blocking_ref(executor, dedicated);
1651 }
1652 }
1653
1654 #[test]
1655 #[should_panic]
1656 fn test_tokio_spawn_blocking_ref_duplicate() {
1657 for dedicated in [false, true] {
1658 let executor = tokio::Runner::default();
1659 test_spawn_blocking_ref_duplicate(executor, dedicated);
1660 }
1661 }
1662
1663 #[test]
1664 fn test_tokio_metrics() {
1665 let executor = tokio::Runner::default();
1666 test_metrics(executor);
1667 }
1668
1669 #[test]
1670 #[should_panic]
1671 fn test_tokio_metrics_label() {
1672 let executor = tokio::Runner::default();
1673 test_metrics_label(executor);
1674 }
1675
1676 #[test]
1677 fn test_tokio_process_rss_metric() {
1678 let executor = tokio::Runner::default();
1679 executor.start(|context| async move {
1680 loop {
1681 let metrics = context.encode();
1683 if !metrics.contains("runtime_process_rss") {
1684 context.sleep(Duration::from_millis(100)).await;
1685 continue;
1686 }
1687
1688 for line in metrics.lines() {
1690 if line.starts_with("runtime_process_rss")
1691 && !line.starts_with("runtime_process_rss{")
1692 {
1693 let parts: Vec<&str> = line.split_whitespace().collect();
1694 if parts.len() >= 2 {
1695 let rss_value: i64 =
1696 parts[1].parse().expect("Failed to parse RSS value");
1697 if rss_value > 0 {
1698 return;
1699 }
1700 }
1701 }
1702 }
1703 }
1704 });
1705 }
1706
1707 #[test]
1708 fn test_tokio_telemetry() {
1709 let executor = tokio::Runner::default();
1710 executor.start(|context| async move {
1711 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1713
1714 tokio::telemetry::init(
1716 context.with_label("metrics"),
1717 tokio::telemetry::Logging {
1718 level: Level::INFO,
1719 json: false,
1720 },
1721 Some(address),
1722 None,
1723 );
1724
1725 let counter: Counter<u64> = Counter::default();
1727 context.register("test_counter", "Test counter", counter.clone());
1728 counter.inc();
1729
1730 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1732 let mut line = Vec::new();
1733 loop {
1734 let byte = stream.recv(vec![0; 1]).await?;
1735 if byte[0] == b'\n' {
1736 if line.last() == Some(&b'\r') {
1737 line.pop(); }
1739 break;
1740 }
1741 line.push(byte[0]);
1742 }
1743 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1744 }
1745
1746 async fn read_headers<St: Stream>(
1747 stream: &mut St,
1748 ) -> Result<HashMap<String, String>, Error> {
1749 let mut headers = HashMap::new();
1750 loop {
1751 let line = read_line(stream).await?;
1752 if line.is_empty() {
1753 break;
1754 }
1755 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1756 if parts.len() == 2 {
1757 headers.insert(parts[0].to_string(), parts[1].to_string());
1758 }
1759 }
1760 Ok(headers)
1761 }
1762
1763 async fn read_body<St: Stream>(
1764 stream: &mut St,
1765 content_length: usize,
1766 ) -> Result<String, Error> {
1767 let read = stream.recv(vec![0; content_length]).await?;
1768 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1769 }
1770
1771 let client_handle = context
1773 .with_label("client")
1774 .spawn(move |context| async move {
1775 let (mut sink, mut stream) = loop {
1776 match context.dial(address).await {
1777 Ok((sink, stream)) => break (sink, stream),
1778 Err(e) => {
1779 error!(err =?e, "failed to connect");
1781 context.sleep(Duration::from_millis(10)).await;
1782 }
1783 }
1784 };
1785
1786 let request = format!(
1788 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1789 );
1790 sink.send(Bytes::from(request).to_vec()).await.unwrap();
1791
1792 let status_line = read_line(&mut stream).await.unwrap();
1794 assert_eq!(status_line, "HTTP/1.1 200 OK");
1795
1796 let headers = read_headers(&mut stream).await.unwrap();
1798 println!("Headers: {headers:?}");
1799 let content_length = headers
1800 .get("content-length")
1801 .unwrap()
1802 .parse::<usize>()
1803 .unwrap();
1804
1805 let body = read_body(&mut stream, content_length).await.unwrap();
1807 assert!(body.contains("test_counter_total 1"));
1808 });
1809
1810 client_handle.await.unwrap();
1812 });
1813 }
1814}