1use commonware_utils::StableBuf;
21use prometheus_client::registry::Metric;
22use std::{
23 future::Future,
24 io::Error as IoError,
25 net::SocketAddr,
26 time::{Duration, SystemTime},
27};
28use thiserror::Error;
29
30#[macro_use]
31mod macros;
32
33pub mod deterministic;
34pub mod mocks;
35cfg_if::cfg_if! {
36 if #[cfg(not(target_arch = "wasm32"))] {
37 pub mod tokio;
38 pub mod benchmarks;
39 }
40}
41mod network;
42mod storage;
43pub mod telemetry;
44mod utils;
45pub use utils::*;
46#[cfg(any(feature = "iouring-storage", feature = "iouring-network"))]
47mod iouring;
48
49const METRICS_PREFIX: &str = "runtime";
51
52#[derive(Error, Debug)]
54pub enum Error {
55 #[error("exited")]
56 Exited,
57 #[error("closed")]
58 Closed,
59 #[error("timeout")]
60 Timeout,
61 #[error("bind failed")]
62 BindFailed,
63 #[error("connection failed")]
64 ConnectionFailed,
65 #[error("write failed")]
66 WriteFailed,
67 #[error("read failed")]
68 ReadFailed,
69 #[error("send failed")]
70 SendFailed,
71 #[error("recv failed")]
72 RecvFailed,
73 #[error("partition creation failed: {0}")]
74 PartitionCreationFailed(String),
75 #[error("partition missing: {0}")]
76 PartitionMissing(String),
77 #[error("partition corrupt: {0}")]
78 PartitionCorrupt(String),
79 #[error("blob open failed: {0}/{1} error: {2}")]
80 BlobOpenFailed(String, String, IoError),
81 #[error("blob missing: {0}/{1}")]
82 BlobMissing(String, String),
83 #[error("blob resize failed: {0}/{1} error: {2}")]
84 BlobResizeFailed(String, String, IoError),
85 #[error("blob sync failed: {0}/{1} error: {2}")]
86 BlobSyncFailed(String, String, IoError),
87 #[error("blob insufficient length")]
88 BlobInsufficientLength,
89 #[error("offset overflow")]
90 OffsetOverflow,
91 #[error("io error: {0}")]
92 Io(#[from] IoError),
93}
94
95pub trait Runner {
98 type Context;
104
105 fn start<F, Fut>(self, f: F) -> Fut::Output
107 where
108 F: FnOnce(Self::Context) -> Fut,
109 Fut: Future;
110}
111
112pub trait Spawner: Clone + Send + Sync + 'static {
114 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122 where
123 F: FnOnce(Self) -> Fut + Send + 'static,
124 Fut: Future<Output = T> + Send + 'static,
125 T: Send + 'static;
126
127 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
136 where
137 F: Future<Output = T> + Send + 'static,
138 T: Send + 'static;
139
140 fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
159 where
160 F: FnOnce(Self) -> T + Send + 'static,
161 T: Send + 'static;
162
163 fn spawn_blocking_ref<F, T>(
172 &mut self,
173 dedicated: bool,
174 ) -> impl FnOnce(F) -> Handle<T> + 'static
175 where
176 F: FnOnce() -> T + Send + 'static,
177 T: Send + 'static;
178
179 fn stop(
199 self,
200 value: i32,
201 timeout: Option<Duration>,
202 ) -> impl Future<Output = Result<(), Error>> + Send;
203
204 fn stopped(&self) -> signal::Signal;
211}
212
213pub trait Metrics: Clone + Send + Sync + 'static {
215 fn label(&self) -> String;
217
218 fn with_label(&self, label: &str) -> Self;
226
227 fn scoped_label(&self, label: &str) -> String {
231 let label = if self.label().is_empty() {
232 label.to_string()
233 } else {
234 format!("{}_{}", self.label(), label)
235 };
236 assert!(
237 !label.starts_with(METRICS_PREFIX),
238 "using runtime label is not allowed"
239 );
240 label
241 }
242
243 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
247
248 fn encode(&self) -> String;
250}
251
252pub trait Clock: Clone + Send + Sync + 'static {
258 fn current(&self) -> SystemTime;
260
261 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
263
264 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
266}
267
268pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
270
271pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
273
274pub type ListenerOf<N> = <N as crate::Network>::Listener;
276
277pub trait Network: Clone + Send + Sync + 'static {
280 type Listener: Listener;
284
285 fn bind(
287 &self,
288 socket: SocketAddr,
289 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
290
291 fn dial(
293 &self,
294 socket: SocketAddr,
295 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
296}
297
298pub trait Listener: Sync + Send + 'static {
301 type Sink: Sink;
304 type Stream: Stream;
307
308 fn accept(
310 &mut self,
311 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
312
313 fn local_addr(&self) -> Result<SocketAddr, std::io::Error>;
315}
316
317pub trait Sink: Sync + Send + 'static {
320 fn send(
322 &mut self,
323 msg: impl Into<StableBuf> + Send,
324 ) -> impl Future<Output = Result<(), Error>> + Send;
325}
326
327pub trait Stream: Sync + Send + 'static {
330 fn recv(
333 &mut self,
334 buf: impl Into<StableBuf> + Send,
335 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
336}
337
338pub trait Storage: Clone + Send + Sync + 'static {
346 type Blob: Blob;
348
349 fn open(
355 &self,
356 partition: &str,
357 name: &[u8],
358 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
359
360 fn remove(
364 &self,
365 partition: &str,
366 name: Option<&[u8]>,
367 ) -> impl Future<Output = Result<(), Error>> + Send;
368
369 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
371}
372
373#[allow(clippy::len_without_is_empty)]
388pub trait Blob: Clone + Send + Sync + 'static {
389 fn read_at(
394 &self,
395 buf: impl Into<StableBuf> + Send,
396 offset: u64,
397 ) -> impl Future<Output = Result<StableBuf, Error>> + Send;
398
399 fn write_at(
401 &self,
402 buf: impl Into<StableBuf> + Send,
403 offset: u64,
404 ) -> impl Future<Output = Result<(), Error>> + Send;
405
406 fn resize(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
411
412 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use bytes::Bytes;
420 use commonware_macros::select;
421 use futures::{
422 channel::{mpsc, oneshot},
423 future::{pending, ready},
424 join, pin_mut, FutureExt, SinkExt, StreamExt,
425 };
426 use prometheus_client::metrics::counter::Counter;
427 use std::{
428 collections::HashMap,
429 panic::{catch_unwind, AssertUnwindSafe},
430 str::FromStr,
431 sync::{
432 atomic::{AtomicU32, Ordering},
433 Arc, Mutex,
434 },
435 };
436 use tracing::{error, Level};
437 use utils::reschedule;
438
439 fn test_error_future<R: Runner>(runner: R) {
440 async fn error_future() -> Result<&'static str, &'static str> {
441 Err("An error occurred")
442 }
443 let result = runner.start(|_| error_future());
444 assert_eq!(result, Err("An error occurred"));
445 }
446
447 fn test_clock_sleep<R: Runner>(runner: R)
448 where
449 R::Context: Spawner + Clock,
450 {
451 runner.start(|context| async move {
452 let start = context.current();
454 let sleep_duration = Duration::from_millis(10);
455 context.sleep(sleep_duration).await;
456
457 let end = context.current();
459 assert!(end.duration_since(start).unwrap() >= sleep_duration);
460 });
461 }
462
463 fn test_clock_sleep_until<R: Runner>(runner: R)
464 where
465 R::Context: Spawner + Clock,
466 {
467 runner.start(|context| async move {
468 let now = context.current();
470 context.sleep_until(now + Duration::from_millis(100)).await;
471
472 let elapsed = now.elapsed().unwrap();
474 assert!(elapsed >= Duration::from_millis(100));
475 });
476 }
477
478 fn test_root_finishes<R: Runner>(runner: R)
479 where
480 R::Context: Spawner,
481 {
482 runner.start(|context| async move {
483 context.spawn(|_| async move {
484 loop {
485 reschedule().await;
486 }
487 });
488 });
489 }
490
491 fn test_spawn_abort<R: Runner>(runner: R)
492 where
493 R::Context: Spawner,
494 {
495 runner.start(|context| async move {
496 let handle = context.spawn(|_| async move {
497 loop {
498 reschedule().await;
499 }
500 });
501 handle.abort();
502 assert!(matches!(handle.await, Err(Error::Closed)));
503 });
504 }
505
506 fn test_panic_aborts_root<R: Runner>(runner: R) {
507 let result = catch_unwind(AssertUnwindSafe(|| {
508 runner.start(|_| async move {
509 panic!("blah");
510 });
511 }));
512 result.unwrap_err();
513 }
514
515 fn test_panic_aborts_spawn<R: Runner>(runner: R)
516 where
517 R::Context: Spawner,
518 {
519 let result = runner.start(|context| async move {
520 let result = context.spawn(|_| async move {
521 panic!("blah");
522 });
523 assert!(matches!(result.await, Err(Error::Exited)));
524 Result::<(), Error>::Ok(())
525 });
526
527 result.unwrap();
529 }
530
531 fn test_select<R: Runner>(runner: R) {
532 runner.start(|_| async move {
533 let output = Mutex::new(0);
535 select! {
536 v1 = ready(1) => {
537 *output.lock().unwrap() = v1;
538 },
539 v2 = ready(2) => {
540 *output.lock().unwrap() = v2;
541 },
542 };
543 assert_eq!(*output.lock().unwrap(), 1);
544
545 select! {
547 v1 = std::future::pending::<i32>() => {
548 *output.lock().unwrap() = v1;
549 },
550 v2 = ready(2) => {
551 *output.lock().unwrap() = v2;
552 },
553 };
554 assert_eq!(*output.lock().unwrap(), 2);
555 });
556 }
557
558 fn test_select_loop<R: Runner>(runner: R)
560 where
561 R::Context: Clock,
562 {
563 runner.start(|context| async move {
564 let (mut sender, mut receiver) = mpsc::unbounded();
566 for _ in 0..2 {
567 select! {
568 v = receiver.next() => {
569 panic!("unexpected value: {v:?}");
570 },
571 _ = context.sleep(Duration::from_millis(100)) => {
572 continue;
573 },
574 };
575 }
576
577 sender.send(0).await.unwrap();
579 sender.send(1).await.unwrap();
580
581 select! {
583 _ = async {} => {
584 },
586 v = receiver.next() => {
587 panic!("unexpected value: {v:?}");
588 },
589 };
590
591 for i in 0..2 {
593 select! {
594 _ = context.sleep(Duration::from_millis(100)) => {
595 panic!("timeout");
596 },
597 v = receiver.next() => {
598 assert_eq!(v.unwrap(), i);
599 },
600 };
601 }
602 });
603 }
604
605 fn test_storage_operations<R: Runner>(runner: R)
606 where
607 R::Context: Storage,
608 {
609 runner.start(|context| async move {
610 let partition = "test_partition";
611 let name = b"test_blob";
612
613 let (blob, _) = context
615 .open(partition, name)
616 .await
617 .expect("Failed to open blob");
618
619 let data = b"Hello, Storage!";
621 blob.write_at(Vec::from(data), 0)
622 .await
623 .expect("Failed to write to blob");
624
625 blob.sync().await.expect("Failed to sync blob");
627
628 let read = blob
630 .read_at(vec![0; data.len()], 0)
631 .await
632 .expect("Failed to read from blob");
633 assert_eq!(read.as_ref(), data);
634
635 blob.sync().await.expect("Failed to sync blob");
637
638 let blobs = context
640 .scan(partition)
641 .await
642 .expect("Failed to scan partition");
643 assert!(blobs.contains(&name.to_vec()));
644
645 let (blob, len) = context
647 .open(partition, name)
648 .await
649 .expect("Failed to reopen blob");
650 assert_eq!(len, data.len() as u64);
651
652 let read = blob
654 .read_at(vec![0u8; 7], 7)
655 .await
656 .expect("Failed to read data");
657 assert_eq!(read.as_ref(), b"Storage");
658
659 blob.sync().await.expect("Failed to sync blob");
661
662 context
664 .remove(partition, Some(name))
665 .await
666 .expect("Failed to remove blob");
667
668 let blobs = context
670 .scan(partition)
671 .await
672 .expect("Failed to scan partition");
673 assert!(!blobs.contains(&name.to_vec()));
674
675 context
677 .remove(partition, None)
678 .await
679 .expect("Failed to remove partition");
680
681 let result = context.scan(partition).await;
683 assert!(matches!(result, Err(Error::PartitionMissing(_))));
684 });
685 }
686
687 fn test_blob_read_write<R: Runner>(runner: R)
688 where
689 R::Context: Storage,
690 {
691 runner.start(|context| async move {
692 let partition = "test_partition";
693 let name = b"test_blob_rw";
694
695 let (blob, _) = context
697 .open(partition, name)
698 .await
699 .expect("Failed to open blob");
700
701 let data1 = b"Hello";
703 let data2 = b"World";
704 blob.write_at(Vec::from(data1), 0)
705 .await
706 .expect("Failed to write data1");
707 blob.write_at(Vec::from(data2), 5)
708 .await
709 .expect("Failed to write data2");
710
711 let read = blob
713 .read_at(vec![0u8; 10], 0)
714 .await
715 .expect("Failed to read data");
716 assert_eq!(&read.as_ref()[..5], data1);
717 assert_eq!(&read.as_ref()[5..], data2);
718
719 let result = blob.read_at(vec![0u8; 10], 10).await;
721 assert!(result.is_err());
722
723 let data3 = b"Store";
725 blob.write_at(Vec::from(data3), 5)
726 .await
727 .expect("Failed to write data3");
728
729 let read = blob
731 .read_at(vec![0u8; 10], 0)
732 .await
733 .expect("Failed to read data");
734 assert_eq!(&read.as_ref()[..5], data1);
735 assert_eq!(&read.as_ref()[5..], data3);
736
737 let result = blob.read_at(vec![0u8; 10], 10).await;
739 assert!(result.is_err());
740 });
741 }
742
743 fn test_blob_resize<R: Runner>(runner: R)
744 where
745 R::Context: Storage,
746 {
747 runner.start(|context| async move {
748 let partition = "test_partition_resize";
749 let name = b"test_blob_resize";
750
751 let (blob, _) = context
753 .open(partition, name)
754 .await
755 .expect("Failed to open blob");
756
757 let data = b"some data";
758 blob.write_at(data.to_vec(), 0)
759 .await
760 .expect("Failed to write");
761 blob.sync().await.expect("Failed to sync after write");
762
763 let (blob, len) = context.open(partition, name).await.unwrap();
765 assert_eq!(len, data.len() as u64);
766
767 let new_len = (data.len() as u64) * 2;
769 blob.resize(new_len)
770 .await
771 .expect("Failed to resize to extend");
772 blob.sync().await.expect("Failed to sync after resize");
773
774 let (blob, len) = context.open(partition, name).await.unwrap();
776 assert_eq!(len, new_len);
777
778 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
780 assert_eq!(read_buf.as_ref(), data);
781
782 let extended_part = blob
784 .read_at(vec![0; data.len()], data.len() as u64)
785 .await
786 .unwrap();
787 assert_eq!(extended_part.as_ref(), vec![0; data.len()].as_slice());
788
789 blob.resize(data.len() as u64).await.unwrap();
791 blob.sync().await.unwrap();
792
793 let (blob, size) = context.open(partition, name).await.unwrap();
795 assert_eq!(size, data.len() as u64);
796
797 let read_buf = blob.read_at(vec![0; data.len()], 0).await.unwrap();
799 assert_eq!(read_buf.as_ref(), data);
800 blob.sync().await.unwrap();
801 });
802 }
803
804 fn test_many_partition_read_write<R: Runner>(runner: R)
805 where
806 R::Context: Storage,
807 {
808 runner.start(|context| async move {
809 let partitions = ["partition1", "partition2", "partition3"];
810 let name = b"test_blob_rw";
811 let data1 = b"Hello";
812 let data2 = b"World";
813
814 for (additional, partition) in partitions.iter().enumerate() {
815 let (blob, _) = context
817 .open(partition, name)
818 .await
819 .expect("Failed to open blob");
820
821 blob.write_at(Vec::from(data1), 0)
823 .await
824 .expect("Failed to write data1");
825 blob.write_at(Vec::from(data2), 5 + additional as u64)
826 .await
827 .expect("Failed to write data2");
828
829 blob.sync().await.expect("Failed to sync blob");
831 }
832
833 for (additional, partition) in partitions.iter().enumerate() {
834 let (blob, len) = context
836 .open(partition, name)
837 .await
838 .expect("Failed to open blob");
839 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
840
841 let read = blob
843 .read_at(vec![0u8; 10 + additional], 0)
844 .await
845 .expect("Failed to read data");
846 assert_eq!(&read.as_ref()[..5], b"Hello");
847 assert_eq!(&read.as_ref()[5 + additional..], b"World");
848 }
849 });
850 }
851
852 fn test_blob_read_past_length<R: Runner>(runner: R)
853 where
854 R::Context: Storage,
855 {
856 runner.start(|context| async move {
857 let partition = "test_partition";
858 let name = b"test_blob_rw";
859
860 let (blob, _) = context
862 .open(partition, name)
863 .await
864 .expect("Failed to open blob");
865
866 let result = blob.read_at(vec![0u8; 10], 0).await;
868 assert!(result.is_err());
869
870 let data = b"Hello, Storage!".to_vec();
872 blob.write_at(data, 0)
873 .await
874 .expect("Failed to write to blob");
875
876 let result = blob.read_at(vec![0u8; 20], 0).await;
878 assert!(result.is_err());
879 })
880 }
881
882 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
883 where
884 R::Context: Spawner + Storage + Metrics,
885 {
886 runner.start(|context| async move {
887 let partition = "test_partition";
888 let name = b"test_blob_rw";
889
890 let (blob, _) = context
892 .open(partition, name)
893 .await
894 .expect("Failed to open blob");
895
896 let data = b"Hello, Storage!";
898 blob.write_at(Vec::from(data), 0)
899 .await
900 .expect("Failed to write to blob");
901
902 blob.sync().await.expect("Failed to sync blob");
904
905 let check1 = context.with_label("check1").spawn({
907 let blob = blob.clone();
908 move |_| async move {
909 let read = blob
910 .read_at(vec![0u8; data.len()], 0)
911 .await
912 .expect("Failed to read from blob");
913 assert_eq!(read.as_ref(), data);
914 }
915 });
916 let check2 = context.with_label("check2").spawn({
917 let blob = blob.clone();
918 move |_| async move {
919 let read = blob
920 .read_at(vec![0; data.len()], 0)
921 .await
922 .expect("Failed to read from blob");
923 assert_eq!(read.as_ref(), data);
924 }
925 });
926
927 let result = join!(check1, check2);
929 assert!(result.0.is_ok());
930 assert!(result.1.is_ok());
931
932 let read = blob
934 .read_at(vec![0; data.len()], 0)
935 .await
936 .expect("Failed to read from blob");
937 assert_eq!(read.as_ref(), data);
938
939 drop(blob);
941
942 let buffer = context.encode();
944 assert!(buffer.contains("open_blobs 0"));
945 });
946 }
947
948 fn test_shutdown<R: Runner>(runner: R)
949 where
950 R::Context: Spawner + Metrics + Clock,
951 {
952 let kill = 9;
953 runner.start(|context| async move {
954 let before = context
956 .with_label("before")
957 .spawn(move |context| async move {
958 let mut signal = context.stopped();
959 let value = (&mut signal).await.unwrap();
960 assert_eq!(value, kill);
961 drop(signal);
962 });
963
964 let result = context.clone().stop(kill, None).await;
966 assert!(result.is_ok());
967
968 let after = context
970 .with_label("after")
971 .spawn(move |context| async move {
972 let value = context.stopped().await.unwrap();
974 assert_eq!(value, kill);
975 });
976
977 let result = join!(before, after);
979 assert!(result.0.is_ok());
980 assert!(result.1.is_ok());
981 });
982 }
983
984 fn test_shutdown_multiple_signals<R: Runner>(runner: R)
985 where
986 R::Context: Spawner + Metrics + Clock,
987 {
988 let kill = 42;
989 runner.start(|context| async move {
990 let (started_tx, mut started_rx) = mpsc::channel(3);
991 let counter = Arc::new(AtomicU32::new(0));
992
993 let task = |cleanup_duration: Duration| {
996 let context = context.clone();
997 let counter = counter.clone();
998 let mut started_tx = started_tx.clone();
999 context.spawn(move |context| async move {
1000 let mut signal = context.stopped();
1002 started_tx.send(()).await.unwrap();
1003
1004 let value = (&mut signal).await.unwrap();
1006 assert_eq!(value, kill);
1007 context.sleep(cleanup_duration).await;
1008 counter.fetch_add(1, Ordering::SeqCst);
1009
1010 drop(signal);
1012 })
1013 };
1014
1015 let task1 = task(Duration::from_millis(10));
1016 let task2 = task(Duration::from_millis(20));
1017 let task3 = task(Duration::from_millis(30));
1018
1019 for _ in 0..3 {
1021 started_rx.next().await.unwrap();
1022 }
1023
1024 context.stop(kill, None).await.unwrap();
1026 assert_eq!(counter.load(Ordering::SeqCst), 3);
1027
1028 let result = join!(task1, task2, task3);
1030 assert!(result.0.is_ok());
1031 assert!(result.1.is_ok());
1032 assert!(result.2.is_ok());
1033 });
1034 }
1035
1036 fn test_shutdown_timeout<R: Runner>(runner: R)
1037 where
1038 R::Context: Spawner + Metrics + Clock,
1039 {
1040 let kill = 42;
1041 runner.start(|context| async move {
1042 let (started_tx, started_rx) = oneshot::channel();
1044
1045 context.clone().spawn(move |context| async move {
1047 let signal = context.stopped();
1048 started_tx.send(()).unwrap();
1049 pending::<()>().await;
1050 signal.await.unwrap();
1051 });
1052
1053 started_rx.await.unwrap();
1055 let result = context.stop(kill, Some(Duration::from_millis(100))).await;
1056
1057 assert!(matches!(result, Err(Error::Timeout)));
1059 });
1060 }
1061
1062 fn test_shutdown_multiple_stop_calls<R: Runner>(runner: R)
1063 where
1064 R::Context: Spawner + Metrics + Clock,
1065 {
1066 let kill1 = 42;
1067 let kill2 = 43;
1068
1069 runner.start(|context| async move {
1070 let (started_tx, started_rx) = oneshot::channel();
1071 let counter = Arc::new(AtomicU32::new(0));
1072
1073 let task = context.with_label("blocking_task").spawn({
1075 let counter = counter.clone();
1076 move |context| async move {
1077 let mut signal = context.stopped();
1079 started_tx.send(()).unwrap();
1080
1081 let value = (&mut signal).await.unwrap();
1083 assert_eq!(value, kill1);
1084 context.sleep(Duration::from_millis(50)).await;
1085
1086 counter.fetch_add(1, Ordering::SeqCst);
1088 drop(signal);
1089 }
1090 });
1091
1092 started_rx.await.unwrap();
1094
1095 let stop_task1 = context.clone().stop(kill1, None);
1098 pin_mut!(stop_task1);
1099 let stop_task2 = context.clone().stop(kill2, None);
1100 pin_mut!(stop_task2);
1101
1102 assert!(stop_task1.as_mut().now_or_never().is_none());
1104 assert!(stop_task2.as_mut().now_or_never().is_none());
1105
1106 assert!(stop_task1.await.is_ok());
1108 assert!(stop_task2.await.is_ok());
1109
1110 let sig = context.stopped().await;
1112 assert_eq!(sig.unwrap(), kill1);
1113
1114 let result = task.await;
1116 assert!(result.is_ok());
1117 assert_eq!(counter.load(Ordering::SeqCst), 1);
1118
1119 assert!(context.stop(kill2, None).now_or_never().unwrap().is_ok());
1121 });
1122 }
1123
1124 fn test_spawn_ref<R: Runner>(runner: R)
1125 where
1126 R::Context: Spawner,
1127 {
1128 runner.start(|mut context| async move {
1129 let handle = context.spawn_ref();
1130 let result = handle(async move { 42 }).await;
1131 assert!(matches!(result, Ok(42)));
1132 });
1133 }
1134
1135 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
1136 where
1137 R::Context: Spawner,
1138 {
1139 runner.start(|mut context| async move {
1140 let handle = context.spawn_ref();
1141 let result = handle(async move { 42 }).await;
1142 assert!(matches!(result, Ok(42)));
1143
1144 let handle = context.spawn_ref();
1146 let result = handle(async move { 42 }).await;
1147 assert!(matches!(result, Ok(42)));
1148 });
1149 }
1150
1151 fn test_spawn_duplicate<R: Runner>(runner: R)
1152 where
1153 R::Context: Spawner,
1154 {
1155 runner.start(|mut context| async move {
1156 let handle = context.spawn_ref();
1157 let result = handle(async move { 42 }).await;
1158 assert!(matches!(result, Ok(42)));
1159
1160 context.spawn(|_| async move { 42 });
1162 });
1163 }
1164
1165 fn test_spawn_blocking<R: Runner>(runner: R, dedicated: bool)
1166 where
1167 R::Context: Spawner,
1168 {
1169 runner.start(|context| async move {
1170 let handle = context.spawn_blocking(dedicated, |_| 42);
1171 let result = handle.await;
1172 assert!(matches!(result, Ok(42)));
1173 });
1174 }
1175
1176 fn test_spawn_blocking_ref<R: Runner>(runner: R, dedicated: bool)
1177 where
1178 R::Context: Spawner,
1179 {
1180 runner.start(|mut context| async move {
1181 let spawn = context.spawn_blocking_ref(dedicated);
1182 let handle = spawn(|| 42);
1183 let result = handle.await;
1184 assert!(matches!(result, Ok(42)));
1185 });
1186 }
1187
1188 fn test_spawn_blocking_ref_duplicate<R: Runner>(runner: R, dedicated: bool)
1189 where
1190 R::Context: Spawner,
1191 {
1192 runner.start(|mut context| async move {
1193 let spawn = context.spawn_blocking_ref(dedicated);
1194 let result = spawn(|| 42).await;
1195 assert!(matches!(result, Ok(42)));
1196
1197 context.spawn_blocking(dedicated, |_| 42);
1199 });
1200 }
1201
1202 fn test_spawn_blocking_abort<R: Runner>(runner: R, dedicated: bool)
1203 where
1204 R::Context: Spawner,
1205 {
1206 runner.start(|context| async move {
1207 let (sender, mut receiver) = oneshot::channel();
1209 let handle = context.spawn_blocking(dedicated, move |_| {
1210 loop {
1212 if receiver.try_recv().is_ok() {
1213 break;
1214 }
1215 }
1216
1217 let mut count = 0;
1219 loop {
1220 count += 1;
1221 if count >= 100_000_000 {
1222 break;
1223 }
1224 }
1225 count
1226 });
1227
1228 handle.abort();
1234 sender.send(()).unwrap();
1235
1236 assert!(matches!(handle.await, Ok(100_000_000)));
1238 });
1239 }
1240
1241 fn test_metrics<R: Runner>(runner: R)
1242 where
1243 R::Context: Metrics,
1244 {
1245 runner.start(|context| async move {
1246 assert_eq!(context.label(), "");
1248
1249 let counter = Counter::<u64>::default();
1251 context.register("test", "test", counter.clone());
1252
1253 counter.inc();
1255
1256 let buffer = context.encode();
1258 assert!(buffer.contains("test_total 1"));
1259
1260 let context = context.with_label("nested");
1262 let nested_counter = Counter::<u64>::default();
1263 context.register("test", "test", nested_counter.clone());
1264
1265 nested_counter.inc();
1267
1268 let buffer = context.encode();
1270 assert!(buffer.contains("nested_test_total 1"));
1271 assert!(buffer.contains("test_total 1"));
1272 });
1273 }
1274
1275 fn test_metrics_label<R: Runner>(runner: R)
1276 where
1277 R::Context: Metrics,
1278 {
1279 runner.start(|context| async move {
1280 context.with_label(METRICS_PREFIX);
1281 })
1282 }
1283
1284 #[test]
1285 fn test_deterministic_future() {
1286 let runner = deterministic::Runner::default();
1287 test_error_future(runner);
1288 }
1289
1290 #[test]
1291 fn test_deterministic_clock_sleep() {
1292 let executor = deterministic::Runner::default();
1293 test_clock_sleep(executor);
1294 }
1295
1296 #[test]
1297 fn test_deterministic_clock_sleep_until() {
1298 let executor = deterministic::Runner::default();
1299 test_clock_sleep_until(executor);
1300 }
1301
1302 #[test]
1303 fn test_deterministic_root_finishes() {
1304 let executor = deterministic::Runner::default();
1305 test_root_finishes(executor);
1306 }
1307
1308 #[test]
1309 fn test_deterministic_spawn_abort() {
1310 let executor = deterministic::Runner::default();
1311 test_spawn_abort(executor);
1312 }
1313
1314 #[test]
1315 fn test_deterministic_panic_aborts_root() {
1316 let runner = deterministic::Runner::default();
1317 test_panic_aborts_root(runner);
1318 }
1319
1320 #[test]
1321 #[should_panic(expected = "blah")]
1322 fn test_deterministic_panic_aborts_spawn() {
1323 let executor = deterministic::Runner::default();
1324 test_panic_aborts_spawn(executor);
1325 }
1326
1327 #[test]
1328 fn test_deterministic_select() {
1329 let executor = deterministic::Runner::default();
1330 test_select(executor);
1331 }
1332
1333 #[test]
1334 fn test_deterministic_select_loop() {
1335 let executor = deterministic::Runner::default();
1336 test_select_loop(executor);
1337 }
1338
1339 #[test]
1340 fn test_deterministic_storage_operations() {
1341 let executor = deterministic::Runner::default();
1342 test_storage_operations(executor);
1343 }
1344
1345 #[test]
1346 fn test_deterministic_blob_read_write() {
1347 let executor = deterministic::Runner::default();
1348 test_blob_read_write(executor);
1349 }
1350
1351 #[test]
1352 fn test_deterministic_blob_resize() {
1353 let executor = deterministic::Runner::default();
1354 test_blob_resize(executor);
1355 }
1356
1357 #[test]
1358 fn test_deterministic_many_partition_read_write() {
1359 let executor = deterministic::Runner::default();
1360 test_many_partition_read_write(executor);
1361 }
1362
1363 #[test]
1364 fn test_deterministic_blob_read_past_length() {
1365 let executor = deterministic::Runner::default();
1366 test_blob_read_past_length(executor);
1367 }
1368
1369 #[test]
1370 fn test_deterministic_blob_clone_and_concurrent_read() {
1371 let executor = deterministic::Runner::default();
1373 test_blob_clone_and_concurrent_read(executor);
1374 }
1375
1376 #[test]
1377 fn test_deterministic_shutdown() {
1378 let executor = deterministic::Runner::default();
1379 test_shutdown(executor);
1380 }
1381
1382 #[test]
1383 fn test_deterministic_shutdown_multiple_signals() {
1384 let executor = deterministic::Runner::default();
1385 test_shutdown_multiple_signals(executor);
1386 }
1387
1388 #[test]
1389 fn test_deterministic_shutdown_timeout() {
1390 let executor = deterministic::Runner::default();
1391 test_shutdown_timeout(executor);
1392 }
1393
1394 #[test]
1395 fn test_deterministic_shutdown_multiple_stop_calls() {
1396 let executor = deterministic::Runner::default();
1397 test_shutdown_multiple_stop_calls(executor);
1398 }
1399
1400 #[test]
1401 fn test_deterministic_spawn_ref() {
1402 let executor = deterministic::Runner::default();
1403 test_spawn_ref(executor);
1404 }
1405
1406 #[test]
1407 #[should_panic]
1408 fn test_deterministic_spawn_ref_duplicate() {
1409 let executor = deterministic::Runner::default();
1410 test_spawn_ref_duplicate(executor);
1411 }
1412
1413 #[test]
1414 #[should_panic]
1415 fn test_deterministic_spawn_duplicate() {
1416 let executor = deterministic::Runner::default();
1417 test_spawn_duplicate(executor);
1418 }
1419
1420 #[test]
1421 fn test_deterministic_spawn_blocking() {
1422 for dedicated in [false, true] {
1423 let executor = deterministic::Runner::default();
1424 test_spawn_blocking(executor, dedicated);
1425 }
1426 }
1427
1428 #[test]
1429 #[should_panic(expected = "blocking task panicked")]
1430 fn test_deterministic_spawn_blocking_panic() {
1431 for dedicated in [false, true] {
1432 let executor = deterministic::Runner::default();
1433 executor.start(|context| async move {
1434 let handle = context.spawn_blocking(dedicated, |_| {
1435 panic!("blocking task panicked");
1436 });
1437 handle.await.unwrap();
1438 });
1439 }
1440 }
1441
1442 #[test]
1443 fn test_deterministic_spawn_blocking_abort() {
1444 for dedicated in [false, true] {
1445 let executor = deterministic::Runner::default();
1446 test_spawn_blocking_abort(executor, dedicated);
1447 }
1448 }
1449
1450 #[test]
1451 fn test_deterministic_spawn_blocking_ref() {
1452 for dedicated in [false, true] {
1453 let executor = deterministic::Runner::default();
1454 test_spawn_blocking_ref(executor, dedicated);
1455 }
1456 }
1457
1458 #[test]
1459 #[should_panic]
1460 fn test_deterministic_spawn_blocking_ref_duplicate() {
1461 for dedicated in [false, true] {
1462 let executor = deterministic::Runner::default();
1463 test_spawn_blocking_ref_duplicate(executor, dedicated);
1464 }
1465 }
1466
1467 #[test]
1468 fn test_deterministic_metrics() {
1469 let executor = deterministic::Runner::default();
1470 test_metrics(executor);
1471 }
1472
1473 #[test]
1474 #[should_panic]
1475 fn test_deterministic_metrics_label() {
1476 let executor = deterministic::Runner::default();
1477 test_metrics_label(executor);
1478 }
1479
1480 #[test]
1481 fn test_tokio_error_future() {
1482 let runner = tokio::Runner::default();
1483 test_error_future(runner);
1484 }
1485
1486 #[test]
1487 fn test_tokio_clock_sleep() {
1488 let executor = tokio::Runner::default();
1489 test_clock_sleep(executor);
1490 }
1491
1492 #[test]
1493 fn test_tokio_clock_sleep_until() {
1494 let executor = tokio::Runner::default();
1495 test_clock_sleep_until(executor);
1496 }
1497
1498 #[test]
1499 fn test_tokio_root_finishes() {
1500 let executor = tokio::Runner::default();
1501 test_root_finishes(executor);
1502 }
1503
1504 #[test]
1505 fn test_tokio_spawn_abort() {
1506 let executor = tokio::Runner::default();
1507 test_spawn_abort(executor);
1508 }
1509
1510 #[test]
1511 fn test_tokio_panic_aborts_root() {
1512 let executor = tokio::Runner::default();
1513 test_panic_aborts_root(executor);
1514 }
1515
1516 #[test]
1517 fn test_tokio_panic_aborts_spawn() {
1518 let executor = tokio::Runner::default();
1519 test_panic_aborts_spawn(executor);
1520 }
1521
1522 #[test]
1523 fn test_tokio_select() {
1524 let executor = tokio::Runner::default();
1525 test_select(executor);
1526 }
1527
1528 #[test]
1529 fn test_tokio_select_loop() {
1530 let executor = tokio::Runner::default();
1531 test_select_loop(executor);
1532 }
1533
1534 #[test]
1535 fn test_tokio_storage_operations() {
1536 let executor = tokio::Runner::default();
1537 test_storage_operations(executor);
1538 }
1539
1540 #[test]
1541 fn test_tokio_blob_read_write() {
1542 let executor = tokio::Runner::default();
1543 test_blob_read_write(executor);
1544 }
1545
1546 #[test]
1547 fn test_tokio_blob_resize() {
1548 let executor = tokio::Runner::default();
1549 test_blob_resize(executor);
1550 }
1551
1552 #[test]
1553 fn test_tokio_many_partition_read_write() {
1554 let executor = tokio::Runner::default();
1555 test_many_partition_read_write(executor);
1556 }
1557
1558 #[test]
1559 fn test_tokio_blob_read_past_length() {
1560 let executor = tokio::Runner::default();
1561 test_blob_read_past_length(executor);
1562 }
1563
1564 #[test]
1565 fn test_tokio_blob_clone_and_concurrent_read() {
1566 let executor = tokio::Runner::default();
1568 test_blob_clone_and_concurrent_read(executor);
1569 }
1570
1571 #[test]
1572 fn test_tokio_shutdown() {
1573 let executor = tokio::Runner::default();
1574 test_shutdown(executor);
1575 }
1576
1577 #[test]
1578 fn test_tokio_shutdown_multiple_signals() {
1579 let executor = tokio::Runner::default();
1580 test_shutdown_multiple_signals(executor);
1581 }
1582
1583 #[test]
1584 fn test_tokio_shutdown_timeout() {
1585 let executor = tokio::Runner::default();
1586 test_shutdown_timeout(executor);
1587 }
1588
1589 #[test]
1590 fn test_tokio_shutdown_multiple_stop_calls() {
1591 let executor = tokio::Runner::default();
1592 test_shutdown_multiple_stop_calls(executor);
1593 }
1594
1595 #[test]
1596 fn test_tokio_spawn_ref() {
1597 let executor = tokio::Runner::default();
1598 test_spawn_ref(executor);
1599 }
1600
1601 #[test]
1602 #[should_panic]
1603 fn test_tokio_spawn_ref_duplicate() {
1604 let executor = tokio::Runner::default();
1605 test_spawn_ref_duplicate(executor);
1606 }
1607
1608 #[test]
1609 #[should_panic]
1610 fn test_tokio_spawn_duplicate() {
1611 let executor = tokio::Runner::default();
1612 test_spawn_duplicate(executor);
1613 }
1614
1615 #[test]
1616 fn test_tokio_spawn_blocking() {
1617 for dedicated in [false, true] {
1618 let executor = tokio::Runner::default();
1619 test_spawn_blocking(executor, dedicated);
1620 }
1621 }
1622
1623 #[test]
1624 fn test_tokio_spawn_blocking_panic() {
1625 for dedicated in [false, true] {
1626 let executor = tokio::Runner::default();
1627 executor.start(|context| async move {
1628 let handle = context.spawn_blocking(dedicated, |_| {
1629 panic!("blocking task panicked");
1630 });
1631 let result = handle.await;
1632 assert!(matches!(result, Err(Error::Exited)));
1633 });
1634 }
1635 }
1636
1637 #[test]
1638 fn test_tokio_spawn_blocking_abort() {
1639 for dedicated in [false, true] {
1640 let executor = tokio::Runner::default();
1641 test_spawn_blocking_abort(executor, dedicated);
1642 }
1643 }
1644
1645 #[test]
1646 fn test_tokio_spawn_blocking_ref() {
1647 for dedicated in [false, true] {
1648 let executor = tokio::Runner::default();
1649 test_spawn_blocking_ref(executor, dedicated);
1650 }
1651 }
1652
1653 #[test]
1654 #[should_panic]
1655 fn test_tokio_spawn_blocking_ref_duplicate() {
1656 for dedicated in [false, true] {
1657 let executor = tokio::Runner::default();
1658 test_spawn_blocking_ref_duplicate(executor, dedicated);
1659 }
1660 }
1661
1662 #[test]
1663 fn test_tokio_metrics() {
1664 let executor = tokio::Runner::default();
1665 test_metrics(executor);
1666 }
1667
1668 #[test]
1669 #[should_panic]
1670 fn test_tokio_metrics_label() {
1671 let executor = tokio::Runner::default();
1672 test_metrics_label(executor);
1673 }
1674
1675 #[test]
1676 fn test_tokio_telemetry() {
1677 let executor = tokio::Runner::default();
1678 executor.start(|context| async move {
1679 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1681
1682 tokio::telemetry::init(
1684 context.with_label("metrics"),
1685 tokio::telemetry::Logging {
1686 level: Level::INFO,
1687 json: false,
1688 },
1689 Some(address),
1690 None,
1691 );
1692
1693 let counter: Counter<u64> = Counter::default();
1695 context.register("test_counter", "Test counter", counter.clone());
1696 counter.inc();
1697
1698 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1700 let mut line = Vec::new();
1701 loop {
1702 let byte = stream.recv(vec![0; 1]).await?;
1703 if byte[0] == b'\n' {
1704 if line.last() == Some(&b'\r') {
1705 line.pop(); }
1707 break;
1708 }
1709 line.push(byte[0]);
1710 }
1711 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1712 }
1713
1714 async fn read_headers<St: Stream>(
1715 stream: &mut St,
1716 ) -> Result<HashMap<String, String>, Error> {
1717 let mut headers = HashMap::new();
1718 loop {
1719 let line = read_line(stream).await?;
1720 if line.is_empty() {
1721 break;
1722 }
1723 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1724 if parts.len() == 2 {
1725 headers.insert(parts[0].to_string(), parts[1].to_string());
1726 }
1727 }
1728 Ok(headers)
1729 }
1730
1731 async fn read_body<St: Stream>(
1732 stream: &mut St,
1733 content_length: usize,
1734 ) -> Result<String, Error> {
1735 let read = stream.recv(vec![0; content_length]).await?;
1736 String::from_utf8(read.into()).map_err(|_| Error::ReadFailed)
1737 }
1738
1739 let client_handle = context
1741 .with_label("client")
1742 .spawn(move |context| async move {
1743 let (mut sink, mut stream) = loop {
1744 match context.dial(address).await {
1745 Ok((sink, stream)) => break (sink, stream),
1746 Err(e) => {
1747 error!(err =?e, "failed to connect");
1749 context.sleep(Duration::from_millis(10)).await;
1750 }
1751 }
1752 };
1753
1754 let request = format!(
1756 "GET /metrics HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n\r\n"
1757 );
1758 sink.send(Bytes::from(request).to_vec()).await.unwrap();
1759
1760 let status_line = read_line(&mut stream).await.unwrap();
1762 assert_eq!(status_line, "HTTP/1.1 200 OK");
1763
1764 let headers = read_headers(&mut stream).await.unwrap();
1766 println!("Headers: {headers:?}");
1767 let content_length = headers
1768 .get("content-length")
1769 .unwrap()
1770 .parse::<usize>()
1771 .unwrap();
1772
1773 let body = read_body(&mut stream, content_length).await.unwrap();
1775 assert!(body.contains("test_counter_total 1"));
1776 });
1777
1778 client_handle.await.unwrap();
1780 });
1781 }
1782}