1use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23 future::Future,
24 net::SocketAddr,
25 time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32 if #[cfg(not(target_arch = "wasm32"))] {
33 pub mod tokio;
34 pub mod benchmarks;
35 }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41 create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44const METRICS_PREFIX: &str = "runtime";
46
47#[derive(Error, Debug)]
49pub enum Error {
50 #[error("exited")]
51 Exited,
52 #[error("closed")]
53 Closed,
54 #[error("timeout")]
55 Timeout,
56 #[error("bind failed")]
57 BindFailed,
58 #[error("connection failed")]
59 ConnectionFailed,
60 #[error("write failed")]
61 WriteFailed,
62 #[error("read failed")]
63 ReadFailed,
64 #[error("send failed")]
65 SendFailed,
66 #[error("recv failed")]
67 RecvFailed,
68 #[error("partition creation failed: {0}")]
69 PartitionCreationFailed(String),
70 #[error("partition missing: {0}")]
71 PartitionMissing(String),
72 #[error("partition corrupt: {0}")]
73 PartitionCorrupt(String),
74 #[error("blob open failed: {0}/{1} error: {2}")]
75 BlobOpenFailed(String, String, IoError),
76 #[error("blob missing: {0}/{1}")]
77 BlobMissing(String, String),
78 #[error("blob truncate failed: {0}/{1} error: {2}")]
79 BlobTruncateFailed(String, String, IoError),
80 #[error("blob sync failed: {0}/{1} error: {2}")]
81 BlobSyncFailed(String, String, IoError),
82 #[error("blob close failed: {0}/{1} error: {2}")]
83 BlobCloseFailed(String, String, IoError),
84 #[error("blob insufficient length")]
85 BlobInsufficientLength,
86 #[error("offset overflow")]
87 OffsetOverflow,
88}
89
90pub trait Runner {
93 type Context;
99
100 fn start<F, Fut>(self, f: F) -> Fut::Output
102 where
103 F: FnOnce(Self::Context) -> Fut,
104 Fut: Future;
105}
106
107pub trait Spawner: Clone + Send + Sync + 'static {
109 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117 where
118 F: FnOnce(Self) -> Fut + Send + 'static,
119 Fut: Future<Output = T> + Send + 'static,
120 T: Send + 'static;
121
122 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135 where
136 F: Future<Output = T> + Send + 'static,
137 T: Send + 'static;
138
139 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149 where
150 F: FnOnce() -> T + Send + 'static,
151 T: Send + 'static;
152
153 fn stop(&self, value: i32);
160
161 fn stopped(&self) -> Signal;
166}
167
168pub trait Metrics: Clone + Send + Sync + 'static {
170 fn label(&self) -> String;
172
173 fn with_label(&self, label: &str) -> Self;
181
182 fn scoped_label(&self, label: &str) -> String {
186 let label = if self.label().is_empty() {
187 label.to_string()
188 } else {
189 format!("{}_{}", self.label(), label)
190 };
191 assert!(
192 !label.starts_with(METRICS_PREFIX),
193 "using runtime label is not allowed"
194 );
195 label
196 }
197
198 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203 fn encode(&self) -> String;
205}
206
207pub trait Clock: Clone + Send + Sync + 'static {
213 fn current(&self) -> SystemTime;
215
216 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223pub type SinkOf<N> = <<N as Network>::Listener as Listener>::Sink;
225
226pub type StreamOf<N> = <<N as Network>::Listener as Listener>::Stream;
228
229pub trait Network: Clone + Send + Sync + 'static {
232 type Listener: Listener;
236
237 fn bind(
239 &self,
240 socket: SocketAddr,
241 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send;
242
243 fn dial(
245 &self,
246 socket: SocketAddr,
247 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send;
248}
249
250pub trait Listener: Sync + Send + 'static {
253 type Sink: Sink;
256 type Stream: Stream;
259
260 fn accept(
262 &mut self,
263 ) -> impl Future<Output = Result<(SocketAddr, Self::Sink, Self::Stream), Error>> + Send;
264}
265
266pub trait Sink: Sync + Send + 'static {
269 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
271}
272
273pub trait Stream: Sync + Send + 'static {
276 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
279}
280
281pub trait Storage: Clone + Send + Sync + 'static {
289 type Blob: Blob;
291
292 fn open(
298 &self,
299 partition: &str,
300 name: &[u8],
301 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
302
303 fn remove(
307 &self,
308 partition: &str,
309 name: Option<&[u8]>,
310 ) -> impl Future<Output = Result<(), Error>> + Send;
311
312 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
314}
315
316#[allow(clippy::len_without_is_empty)]
327pub trait Blob: Clone + Send + Sync + 'static {
328 fn read_at(
333 &self,
334 buf: &mut [u8],
335 offset: u64,
336 ) -> impl Future<Output = Result<(), Error>> + Send;
337
338 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
340
341 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
343
344 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
346
347 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354 use commonware_macros::select;
355 use futures::channel::oneshot;
356 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
357 use prometheus_client::metrics::counter::Counter;
358 use std::collections::HashMap;
359 use std::panic::{catch_unwind, AssertUnwindSafe};
360 use std::str::FromStr;
361 use std::sync::Mutex;
362 use tracing::{error, Level};
363 use utils::reschedule;
364
365 fn test_error_future<R: Runner>(runner: R) {
366 async fn error_future() -> Result<&'static str, &'static str> {
367 Err("An error occurred")
368 }
369 let result = runner.start(|_| error_future());
370 assert_eq!(result, Err("An error occurred"));
371 }
372
373 fn test_clock_sleep<R: Runner>(runner: R)
374 where
375 R::Context: Spawner + Clock,
376 {
377 runner.start(|context| async move {
378 let start = context.current();
380 let sleep_duration = Duration::from_millis(10);
381 context.sleep(sleep_duration).await;
382
383 let end = context.current();
385 assert!(end.duration_since(start).unwrap() >= sleep_duration);
386 });
387 }
388
389 fn test_clock_sleep_until<R: Runner>(runner: R)
390 where
391 R::Context: Spawner + Clock,
392 {
393 runner.start(|context| async move {
394 let now = context.current();
396 context.sleep_until(now + Duration::from_millis(100)).await;
397
398 let elapsed = now.elapsed().unwrap();
400 assert!(elapsed >= Duration::from_millis(100));
401 });
402 }
403
404 fn test_root_finishes<R: Runner>(runner: R)
405 where
406 R::Context: Spawner,
407 {
408 runner.start(|context| async move {
409 context.spawn(|_| async move {
410 loop {
411 reschedule().await;
412 }
413 });
414 });
415 }
416
417 fn test_spawn_abort<R: Runner>(runner: R)
418 where
419 R::Context: Spawner,
420 {
421 runner.start(|context| async move {
422 let handle = context.spawn(|_| async move {
423 loop {
424 reschedule().await;
425 }
426 });
427 handle.abort();
428 assert!(matches!(handle.await, Err(Error::Closed)));
429 });
430 }
431
432 fn test_panic_aborts_root<R: Runner>(runner: R) {
433 let result = catch_unwind(AssertUnwindSafe(|| {
434 runner.start(|_| async move {
435 panic!("blah");
436 });
437 }));
438 result.unwrap_err();
439 }
440
441 fn test_panic_aborts_spawn<R: Runner>(runner: R)
442 where
443 R::Context: Spawner,
444 {
445 let result = runner.start(|context| async move {
446 let result = context.spawn(|_| async move {
447 panic!("blah");
448 });
449 assert!(matches!(result.await, Err(Error::Exited)));
450 Result::<(), Error>::Ok(())
451 });
452
453 result.unwrap();
455 }
456
457 fn test_select<R: Runner>(runner: R) {
458 runner.start(|_| async move {
459 let output = Mutex::new(0);
461 select! {
462 v1 = ready(1) => {
463 *output.lock().unwrap() = v1;
464 },
465 v2 = ready(2) => {
466 *output.lock().unwrap() = v2;
467 },
468 };
469 assert_eq!(*output.lock().unwrap(), 1);
470
471 select! {
473 v1 = std::future::pending::<i32>() => {
474 *output.lock().unwrap() = v1;
475 },
476 v2 = ready(2) => {
477 *output.lock().unwrap() = v2;
478 },
479 };
480 assert_eq!(*output.lock().unwrap(), 2);
481 });
482 }
483
484 fn test_select_loop<R: Runner>(runner: R)
486 where
487 R::Context: Clock,
488 {
489 runner.start(|context| async move {
490 let (mut sender, mut receiver) = mpsc::unbounded();
492 for _ in 0..2 {
493 select! {
494 v = receiver.next() => {
495 panic!("unexpected value: {:?}", v);
496 },
497 _ = context.sleep(Duration::from_millis(100)) => {
498 continue;
499 },
500 };
501 }
502
503 sender.send(0).await.unwrap();
505 sender.send(1).await.unwrap();
506
507 select! {
509 _ = async {} => {
510 },
512 v = receiver.next() => {
513 panic!("unexpected value: {:?}", v);
514 },
515 };
516
517 for i in 0..2 {
519 select! {
520 _ = context.sleep(Duration::from_millis(100)) => {
521 panic!("timeout");
522 },
523 v = receiver.next() => {
524 assert_eq!(v.unwrap(), i);
525 },
526 };
527 }
528 });
529 }
530
531 fn test_storage_operations<R: Runner>(runner: R)
532 where
533 R::Context: Storage,
534 {
535 runner.start(|context| async move {
536 let partition = "test_partition";
537 let name = b"test_blob";
538
539 let (blob, _) = context
541 .open(partition, name)
542 .await
543 .expect("Failed to open blob");
544
545 let data = b"Hello, Storage!";
547 blob.write_at(data, 0)
548 .await
549 .expect("Failed to write to blob");
550
551 blob.sync().await.expect("Failed to sync blob");
553
554 let mut buffer = vec![0u8; data.len()];
556 blob.read_at(&mut buffer, 0)
557 .await
558 .expect("Failed to read from blob");
559 assert_eq!(&buffer, data);
560
561 blob.close().await.expect("Failed to close blob");
563
564 let blobs = context
566 .scan(partition)
567 .await
568 .expect("Failed to scan partition");
569 assert!(blobs.contains(&name.to_vec()));
570
571 let (blob, len) = context
573 .open(partition, name)
574 .await
575 .expect("Failed to reopen blob");
576 assert_eq!(len, data.len() as u64);
577
578 let mut buffer = vec![0u8; 7];
580 blob.read_at(&mut buffer, 7)
581 .await
582 .expect("Failed to read data");
583 assert_eq!(&buffer, b"Storage");
584
585 blob.close().await.expect("Failed to close blob");
587
588 context
590 .remove(partition, Some(name))
591 .await
592 .expect("Failed to remove blob");
593
594 let blobs = context
596 .scan(partition)
597 .await
598 .expect("Failed to scan partition");
599 assert!(!blobs.contains(&name.to_vec()));
600
601 context
603 .remove(partition, None)
604 .await
605 .expect("Failed to remove partition");
606
607 let result = context.scan(partition).await;
609 assert!(matches!(result, Err(Error::PartitionMissing(_))));
610 });
611 }
612
613 fn test_blob_read_write<R: Runner>(runner: R)
614 where
615 R::Context: Storage,
616 {
617 runner.start(|context| async move {
618 let partition = "test_partition";
619 let name = b"test_blob_rw";
620
621 let (blob, _) = context
623 .open(partition, name)
624 .await
625 .expect("Failed to open blob");
626
627 let data1 = b"Hello";
629 let data2 = b"World";
630 blob.write_at(data1, 0)
631 .await
632 .expect("Failed to write data1");
633 blob.write_at(data2, 5)
634 .await
635 .expect("Failed to write data2");
636
637 let mut buffer = vec![0u8; 10];
639 blob.read_at(&mut buffer, 0)
640 .await
641 .expect("Failed to read data");
642 assert_eq!(&buffer[..5], data1);
643 assert_eq!(&buffer[5..], data2);
644
645 let data3 = b"Store";
647 blob.write_at(data3, 5)
648 .await
649 .expect("Failed to write data3");
650
651 blob.truncate(5).await.expect("Failed to truncate blob");
653 let mut buffer = vec![0u8; 5];
654 blob.read_at(&mut buffer, 0)
655 .await
656 .expect("Failed to read data");
657 assert_eq!(&buffer[..5], data1);
658
659 let mut buffer = vec![0u8; 10];
661 let result = blob.read_at(&mut buffer, 0).await;
662 assert!(result.is_err());
663
664 blob.close().await.expect("Failed to close blob");
666 });
667 }
668
669 fn test_many_partition_read_write<R: Runner>(runner: R)
670 where
671 R::Context: Storage,
672 {
673 runner.start(|context| async move {
674 let partitions = ["partition1", "partition2", "partition3"];
675 let name = b"test_blob_rw";
676 let data1 = b"Hello";
677 let data2 = b"World";
678
679 for (additional, partition) in partitions.iter().enumerate() {
680 let (blob, _) = context
682 .open(partition, name)
683 .await
684 .expect("Failed to open blob");
685
686 blob.write_at(data1, 0)
688 .await
689 .expect("Failed to write data1");
690 blob.write_at(data2, 5 + additional as u64)
691 .await
692 .expect("Failed to write data2");
693
694 blob.close().await.expect("Failed to close blob");
696 }
697
698 for (additional, partition) in partitions.iter().enumerate() {
699 let (blob, len) = context
701 .open(partition, name)
702 .await
703 .expect("Failed to open blob");
704 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
705
706 let mut buffer = vec![0u8; 10 + additional];
708 blob.read_at(&mut buffer, 0)
709 .await
710 .expect("Failed to read data");
711 assert_eq!(&buffer[..5], b"Hello");
712 assert_eq!(&buffer[5 + additional..], b"World");
713
714 blob.close().await.expect("Failed to close blob");
716 }
717 });
718 }
719
720 fn test_blob_read_past_length<R: Runner>(runner: R)
721 where
722 R::Context: Storage,
723 {
724 runner.start(|context| async move {
725 let partition = "test_partition";
726 let name = b"test_blob_rw";
727
728 let (blob, _) = context
730 .open(partition, name)
731 .await
732 .expect("Failed to open blob");
733
734 let mut buffer = vec![0u8; 10];
736 let result = blob.read_at(&mut buffer, 0).await;
737 assert!(result.is_err());
738
739 let data = b"Hello, Storage!";
741 blob.write_at(data, 0)
742 .await
743 .expect("Failed to write to blob");
744
745 let mut buffer = vec![0u8; 20];
747 let result = blob.read_at(&mut buffer, 0).await;
748 assert!(result.is_err());
749 })
750 }
751
752 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
753 where
754 R::Context: Spawner + Storage + Metrics,
755 {
756 runner.start(|context| async move {
757 let partition = "test_partition";
758 let name = b"test_blob_rw";
759
760 let (blob, _) = context
762 .open(partition, name)
763 .await
764 .expect("Failed to open blob");
765
766 let data = b"Hello, Storage!";
768 blob.write_at(data, 0)
769 .await
770 .expect("Failed to write to blob");
771
772 blob.sync().await.expect("Failed to sync blob");
774
775 let check1 = context.with_label("check1").spawn({
777 let blob = blob.clone();
778 move |_| async move {
779 let mut buffer = vec![0u8; data.len()];
780 blob.read_at(&mut buffer, 0)
781 .await
782 .expect("Failed to read from blob");
783 assert_eq!(&buffer, data);
784 }
785 });
786 let check2 = context.with_label("check2").spawn({
787 let blob = blob.clone();
788 move |_| async move {
789 let mut buffer = vec![0u8; data.len()];
790 blob.read_at(&mut buffer, 0)
791 .await
792 .expect("Failed to read from blob");
793 assert_eq!(&buffer, data);
794 }
795 });
796
797 let result = join!(check1, check2);
799 assert!(result.0.is_ok());
800 assert!(result.1.is_ok());
801
802 let mut buffer = vec![0u8; data.len()];
804 blob.read_at(&mut buffer, 0)
805 .await
806 .expect("Failed to read from blob");
807 assert_eq!(&buffer, data);
808
809 blob.close().await.expect("Failed to close blob");
811
812 let buffer = context.encode();
814 assert!(buffer.contains("open_blobs 0"));
815 });
816 }
817
818 fn test_shutdown<R: Runner>(runner: R)
819 where
820 R::Context: Spawner + Metrics + Clock,
821 {
822 let kill = 9;
823 runner.start(|context| async move {
824 let before = context
826 .with_label("before")
827 .spawn(move |context| async move {
828 let sig = context.stopped().await;
829 assert_eq!(sig.unwrap(), kill);
830 });
831
832 let after = context
834 .with_label("after")
835 .spawn(move |context| async move {
836 let mut signal = context.stopped();
838 loop {
839 select! {
840 sig = &mut signal => {
841 assert_eq!(sig.unwrap(), kill);
843 break;
844 },
845 _ = context.sleep(Duration::from_millis(10)) => {
846 },
848 }
849 }
850 });
851
852 context.sleep(Duration::from_millis(50)).await;
854
855 context.stop(kill);
857
858 let result = join!(before, after);
860 assert!(result.0.is_ok());
861 assert!(result.1.is_ok());
862 });
863 }
864
865 fn test_spawn_ref<R: Runner>(runner: R)
866 where
867 R::Context: Spawner,
868 {
869 runner.start(|mut context| async move {
870 let handle = context.spawn_ref();
871 let result = handle(async move { 42 }).await;
872 assert!(matches!(result, Ok(42)));
873 });
874 }
875
876 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
877 where
878 R::Context: Spawner,
879 {
880 runner.start(|mut context| async move {
881 let handle = context.spawn_ref();
882 let result = handle(async move { 42 }).await;
883 assert!(matches!(result, Ok(42)));
884
885 let handle = context.spawn_ref();
887 let result = handle(async move { 42 }).await;
888 assert!(matches!(result, Ok(42)));
889 });
890 }
891
892 fn test_spawn_duplicate<R: Runner>(runner: R)
893 where
894 R::Context: Spawner,
895 {
896 runner.start(|mut context| async move {
897 let handle = context.spawn_ref();
898 let result = handle(async move { 42 }).await;
899 assert!(matches!(result, Ok(42)));
900
901 context.spawn(|_| async move { 42 });
903 });
904 }
905
906 fn test_spawn_blocking<R: Runner>(runner: R)
907 where
908 R::Context: Spawner,
909 {
910 runner.start(|context| async move {
911 let handle = context.spawn_blocking(|| 42);
912 let result = handle.await;
913 assert!(matches!(result, Ok(42)));
914 });
915 }
916
917 fn test_spawn_blocking_abort<R: Runner>(runner: R)
918 where
919 R::Context: Spawner,
920 {
921 runner.start(|context| async move {
922 let (sender, mut receiver) = oneshot::channel();
924 let handle = context.spawn_blocking(move || {
925 loop {
927 if receiver.try_recv().is_ok() {
928 break;
929 }
930 }
931
932 let mut count = 0;
934 loop {
935 count += 1;
936 if count >= 100_000_000 {
937 break;
938 }
939 }
940 count
941 });
942
943 handle.abort();
949 sender.send(()).unwrap();
950
951 assert!(matches!(handle.await, Ok(100_000_000)));
953 });
954 }
955
956 fn test_metrics<R: Runner>(runner: R)
957 where
958 R::Context: Metrics,
959 {
960 runner.start(|context| async move {
961 assert_eq!(context.label(), "");
963
964 let counter = Counter::<u64>::default();
966 context.register("test", "test", counter.clone());
967
968 counter.inc();
970
971 let buffer = context.encode();
973 assert!(buffer.contains("test_total 1"));
974
975 let context = context.with_label("nested");
977 let nested_counter = Counter::<u64>::default();
978 context.register("test", "test", nested_counter.clone());
979
980 nested_counter.inc();
982
983 let buffer = context.encode();
985 assert!(buffer.contains("nested_test_total 1"));
986 assert!(buffer.contains("test_total 1"));
987 });
988 }
989
990 fn test_metrics_label<R: Runner>(runner: R)
991 where
992 R::Context: Metrics,
993 {
994 runner.start(|context| async move {
995 context.with_label(METRICS_PREFIX);
996 })
997 }
998
999 #[test]
1000 fn test_deterministic_future() {
1001 let runner = deterministic::Runner::default();
1002 test_error_future(runner);
1003 }
1004
1005 #[test]
1006 fn test_deterministic_clock_sleep() {
1007 let executor = deterministic::Runner::default();
1008 test_clock_sleep(executor);
1009 }
1010
1011 #[test]
1012 fn test_deterministic_clock_sleep_until() {
1013 let executor = deterministic::Runner::default();
1014 test_clock_sleep_until(executor);
1015 }
1016
1017 #[test]
1018 fn test_deterministic_root_finishes() {
1019 let executor = deterministic::Runner::default();
1020 test_root_finishes(executor);
1021 }
1022
1023 #[test]
1024 fn test_deterministic_spawn_abort() {
1025 let executor = deterministic::Runner::default();
1026 test_spawn_abort(executor);
1027 }
1028
1029 #[test]
1030 fn test_deterministic_panic_aborts_root() {
1031 let runner = deterministic::Runner::default();
1032 test_panic_aborts_root(runner);
1033 }
1034
1035 #[test]
1036 #[should_panic(expected = "blah")]
1037 fn test_deterministic_panic_aborts_spawn() {
1038 let executor = deterministic::Runner::default();
1039 test_panic_aborts_spawn(executor);
1040 }
1041
1042 #[test]
1043 fn test_deterministic_select() {
1044 let executor = deterministic::Runner::default();
1045 test_select(executor);
1046 }
1047
1048 #[test]
1049 fn test_deterministic_select_loop() {
1050 let executor = deterministic::Runner::default();
1051 test_select_loop(executor);
1052 }
1053
1054 #[test]
1055 fn test_deterministic_storage_operations() {
1056 let executor = deterministic::Runner::default();
1057 test_storage_operations(executor);
1058 }
1059
1060 #[test]
1061 fn test_deterministic_blob_read_write() {
1062 let executor = deterministic::Runner::default();
1063 test_blob_read_write(executor);
1064 }
1065
1066 #[test]
1067 fn test_deterministic_many_partition_read_write() {
1068 let executor = deterministic::Runner::default();
1069 test_many_partition_read_write(executor);
1070 }
1071
1072 #[test]
1073 fn test_deterministic_blob_read_past_length() {
1074 let executor = deterministic::Runner::default();
1075 test_blob_read_past_length(executor);
1076 }
1077
1078 #[test]
1079 fn test_deterministic_blob_clone_and_concurrent_read() {
1080 let executor = deterministic::Runner::default();
1082 test_blob_clone_and_concurrent_read(executor);
1083 }
1084
1085 #[test]
1086 fn test_deterministic_shutdown() {
1087 let executor = deterministic::Runner::default();
1088 test_shutdown(executor);
1089 }
1090
1091 #[test]
1092 fn test_deterministic_spawn_ref() {
1093 let executor = deterministic::Runner::default();
1094 test_spawn_ref(executor);
1095 }
1096
1097 #[test]
1098 #[should_panic]
1099 fn test_deterministic_spawn_ref_duplicate() {
1100 let executor = deterministic::Runner::default();
1101 test_spawn_ref_duplicate(executor);
1102 }
1103
1104 #[test]
1105 #[should_panic]
1106 fn test_deterministic_spawn_duplicate() {
1107 let executor = deterministic::Runner::default();
1108 test_spawn_duplicate(executor);
1109 }
1110
1111 #[test]
1112 fn test_deterministic_spawn_blocking() {
1113 let executor = deterministic::Runner::default();
1114 test_spawn_blocking(executor);
1115 }
1116
1117 #[test]
1118 #[should_panic(expected = "blocking task panicked")]
1119 fn test_deterministic_spawn_blocking_panic() {
1120 let executor = deterministic::Runner::default();
1121 executor.start(|context| async move {
1122 let handle = context.spawn_blocking(|| {
1123 panic!("blocking task panicked");
1124 });
1125 handle.await.unwrap();
1126 });
1127 }
1128
1129 #[test]
1130 fn test_deterministic_spawn_blocking_abort() {
1131 let executor = deterministic::Runner::default();
1132 test_spawn_blocking_abort(executor);
1133 }
1134
1135 #[test]
1136 fn test_deterministic_metrics() {
1137 let executor = deterministic::Runner::default();
1138 test_metrics(executor);
1139 }
1140
1141 #[test]
1142 #[should_panic]
1143 fn test_deterministic_metrics_label() {
1144 let executor = deterministic::Runner::default();
1145 test_metrics_label(executor);
1146 }
1147
1148 #[test]
1149 fn test_tokio_error_future() {
1150 let runner = tokio::Runner::default();
1151 test_error_future(runner);
1152 }
1153
1154 #[test]
1155 fn test_tokio_clock_sleep() {
1156 let executor = tokio::Runner::default();
1157 test_clock_sleep(executor);
1158 }
1159
1160 #[test]
1161 fn test_tokio_clock_sleep_until() {
1162 let executor = tokio::Runner::default();
1163 test_clock_sleep_until(executor);
1164 }
1165
1166 #[test]
1167 fn test_tokio_root_finishes() {
1168 let executor = tokio::Runner::default();
1169 test_root_finishes(executor);
1170 }
1171
1172 #[test]
1173 fn test_tokio_spawn_abort() {
1174 let executor = tokio::Runner::default();
1175 test_spawn_abort(executor);
1176 }
1177
1178 #[test]
1179 fn test_tokio_panic_aborts_root() {
1180 let executor = tokio::Runner::default();
1181 test_panic_aborts_root(executor);
1182 }
1183
1184 #[test]
1185 fn test_tokio_panic_aborts_spawn() {
1186 let executor = tokio::Runner::default();
1187 test_panic_aborts_spawn(executor);
1188 }
1189
1190 #[test]
1191 fn test_tokio_select() {
1192 let executor = tokio::Runner::default();
1193 test_select(executor);
1194 }
1195
1196 #[test]
1197 fn test_tokio_select_loop() {
1198 let executor = tokio::Runner::default();
1199 test_select_loop(executor);
1200 }
1201
1202 #[test]
1203 fn test_tokio_storage_operations() {
1204 let executor = tokio::Runner::default();
1205 test_storage_operations(executor);
1206 }
1207
1208 #[test]
1209 fn test_tokio_blob_read_write() {
1210 let executor = tokio::Runner::default();
1211 test_blob_read_write(executor);
1212 }
1213
1214 #[test]
1215 fn test_tokio_many_partition_read_write() {
1216 let executor = tokio::Runner::default();
1217 test_many_partition_read_write(executor);
1218 }
1219
1220 #[test]
1221 fn test_tokio_blob_read_past_length() {
1222 let executor = tokio::Runner::default();
1223 test_blob_read_past_length(executor);
1224 }
1225
1226 #[test]
1227 fn test_tokio_blob_clone_and_concurrent_read() {
1228 let executor = tokio::Runner::default();
1230 test_blob_clone_and_concurrent_read(executor);
1231 }
1232
1233 #[test]
1234 fn test_tokio_shutdown() {
1235 let executor = tokio::Runner::default();
1236 test_shutdown(executor);
1237 }
1238
1239 #[test]
1240 fn test_tokio_spawn_ref() {
1241 let executor = tokio::Runner::default();
1242 test_spawn_ref(executor);
1243 }
1244
1245 #[test]
1246 #[should_panic]
1247 fn test_tokio_spawn_ref_duplicate() {
1248 let executor = tokio::Runner::default();
1249 test_spawn_ref_duplicate(executor);
1250 }
1251
1252 #[test]
1253 #[should_panic]
1254 fn test_tokio_spawn_duplicate() {
1255 let executor = tokio::Runner::default();
1256 test_spawn_duplicate(executor);
1257 }
1258
1259 #[test]
1260 fn test_tokio_spawn_blocking() {
1261 let executor = tokio::Runner::default();
1262 test_spawn_blocking(executor);
1263 }
1264
1265 #[test]
1266 fn test_tokio_spawn_blocking_panic() {
1267 let executor = tokio::Runner::default();
1268 executor.start(|context| async move {
1269 let handle = context.spawn_blocking(|| {
1270 panic!("blocking task panicked");
1271 });
1272 let result = handle.await;
1273 assert!(matches!(result, Err(Error::Exited)));
1274 });
1275 }
1276
1277 #[test]
1278 fn test_tokio_spawn_blocking_abort() {
1279 let executor = tokio::Runner::default();
1280 test_spawn_blocking_abort(executor);
1281 }
1282
1283 #[test]
1284 fn test_tokio_metrics() {
1285 let executor = tokio::Runner::default();
1286 test_metrics(executor);
1287 }
1288
1289 #[test]
1290 #[should_panic]
1291 fn test_tokio_metrics_label() {
1292 let executor = tokio::Runner::default();
1293 test_metrics_label(executor);
1294 }
1295
1296 #[test]
1297 fn test_tokio_telemetry() {
1298 let executor = tokio::Runner::default();
1299 executor.start(|context| async move {
1300 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1302
1303 tokio::telemetry::init(
1305 context.with_label("metrics"),
1306 tokio::telemetry::Logging {
1307 level: Level::INFO,
1308 json: false,
1309 },
1310 Some(address),
1311 None,
1312 );
1313
1314 let counter: Counter<u64> = Counter::default();
1316 context.register("test_counter", "Test counter", counter.clone());
1317 counter.inc();
1318
1319 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1321 let mut line = Vec::new();
1322 loop {
1323 let mut byte = [0; 1];
1324 stream.recv(&mut byte).await?;
1325 if byte[0] == b'\n' {
1326 if line.last() == Some(&b'\r') {
1327 line.pop(); }
1329 break;
1330 }
1331 line.push(byte[0]);
1332 }
1333 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1334 }
1335
1336 async fn read_headers<St: Stream>(
1337 stream: &mut St,
1338 ) -> Result<HashMap<String, String>, Error> {
1339 let mut headers = HashMap::new();
1340 loop {
1341 let line = read_line(stream).await?;
1342 if line.is_empty() {
1343 break;
1344 }
1345 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1346 if parts.len() == 2 {
1347 headers.insert(parts[0].to_string(), parts[1].to_string());
1348 }
1349 }
1350 Ok(headers)
1351 }
1352
1353 async fn read_body<St: Stream>(
1354 stream: &mut St,
1355 content_length: usize,
1356 ) -> Result<String, Error> {
1357 let mut body = vec![0; content_length];
1358 stream.recv(&mut body).await?;
1359 String::from_utf8(body).map_err(|_| Error::ReadFailed)
1360 }
1361
1362 let client_handle = context
1364 .with_label("client")
1365 .spawn(move |context| async move {
1366 let (mut sink, mut stream) = loop {
1367 match context.dial(address).await {
1368 Ok((sink, stream)) => break (sink, stream),
1369 Err(e) => {
1370 error!(err =?e, "failed to connect");
1372 context.sleep(Duration::from_millis(10)).await;
1373 }
1374 }
1375 };
1376
1377 let request = format!(
1379 "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1380 address
1381 );
1382 sink.send(request.as_bytes()).await.unwrap();
1383
1384 let status_line = read_line(&mut stream).await.unwrap();
1386 assert_eq!(status_line, "HTTP/1.1 200 OK");
1387
1388 let headers = read_headers(&mut stream).await.unwrap();
1390 println!("Headers: {:?}", headers);
1391 let content_length = headers
1392 .get("content-length")
1393 .unwrap()
1394 .parse::<usize>()
1395 .unwrap();
1396
1397 let body = read_body(&mut stream, content_length).await.unwrap();
1399 assert!(body.contains("test_counter_total 1"));
1400 });
1401
1402 client_handle.await.unwrap();
1404 });
1405 }
1406}