1use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23 future::Future,
24 net::SocketAddr,
25 time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32 if #[cfg(not(target_arch = "wasm32"))] {
33 pub mod tokio;
34 pub mod benchmarks;
35 }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41 create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44const METRICS_PREFIX: &str = "runtime";
46
47#[derive(Error, Debug)]
49pub enum Error {
50 #[error("exited")]
51 Exited,
52 #[error("closed")]
53 Closed,
54 #[error("timeout")]
55 Timeout,
56 #[error("bind failed")]
57 BindFailed,
58 #[error("connection failed")]
59 ConnectionFailed,
60 #[error("write failed")]
61 WriteFailed,
62 #[error("read failed")]
63 ReadFailed,
64 #[error("send failed")]
65 SendFailed,
66 #[error("recv failed")]
67 RecvFailed,
68 #[error("partition creation failed: {0}")]
69 PartitionCreationFailed(String),
70 #[error("partition missing: {0}")]
71 PartitionMissing(String),
72 #[error("partition corrupt: {0}")]
73 PartitionCorrupt(String),
74 #[error("blob open failed: {0}/{1} error: {2}")]
75 BlobOpenFailed(String, String, IoError),
76 #[error("blob missing: {0}/{1}")]
77 BlobMissing(String, String),
78 #[error("blob truncate failed: {0}/{1} error: {2}")]
79 BlobTruncateFailed(String, String, IoError),
80 #[error("blob sync failed: {0}/{1} error: {2}")]
81 BlobSyncFailed(String, String, IoError),
82 #[error("blob close failed: {0}/{1} error: {2}")]
83 BlobCloseFailed(String, String, IoError),
84 #[error("blob insufficient length")]
85 BlobInsufficientLength,
86 #[error("offset overflow")]
87 OffsetOverflow,
88}
89
90pub trait Runner {
93 type Context;
99
100 fn start<F, Fut>(self, f: F) -> Fut::Output
102 where
103 F: FnOnce(Self::Context) -> Fut,
104 Fut: Future;
105}
106
107pub trait Spawner: Clone + Send + Sync + 'static {
109 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117 where
118 F: FnOnce(Self) -> Fut + Send + 'static,
119 Fut: Future<Output = T> + Send + 'static,
120 T: Send + 'static;
121
122 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135 where
136 F: Future<Output = T> + Send + 'static,
137 T: Send + 'static;
138
139 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149 where
150 F: FnOnce() -> T + Send + 'static,
151 T: Send + 'static;
152
153 fn stop(&self, value: i32);
160
161 fn stopped(&self) -> Signal;
166}
167
168pub trait Metrics: Clone + Send + Sync + 'static {
170 fn label(&self) -> String;
172
173 fn with_label(&self, label: &str) -> Self;
181
182 fn scoped_label(&self, label: &str) -> String {
186 let label = if self.label().is_empty() {
187 label.to_string()
188 } else {
189 format!("{}_{}", self.label(), label)
190 };
191 assert!(
192 !label.starts_with(METRICS_PREFIX),
193 "using runtime label is not allowed"
194 );
195 label
196 }
197
198 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203 fn encode(&self) -> String;
205}
206
207pub trait Clock: Clone + Send + Sync + 'static {
213 fn current(&self) -> SystemTime;
215
216 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
226where
227 L: Listener<Si, St>,
228 Si: Sink,
229 St: Stream,
230{
231 fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
233
234 fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
236}
237
238pub trait Listener<Si, St>: Sync + Send + 'static
241where
242 Si: Sink,
243 St: Stream,
244{
245 fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
247}
248
249pub trait Sink: Sync + Send + 'static {
252 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256pub trait Stream: Sync + Send + 'static {
259 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
262}
263
264pub trait Storage: Clone + Send + Sync + 'static {
272 type Blob: Blob;
274
275 fn open(
281 &self,
282 partition: &str,
283 name: &[u8],
284 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
285
286 fn remove(
290 &self,
291 partition: &str,
292 name: Option<&[u8]>,
293 ) -> impl Future<Output = Result<(), Error>> + Send;
294
295 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
297}
298
299#[allow(clippy::len_without_is_empty)]
310pub trait Blob: Clone + Send + Sync + 'static {
311 fn read_at(
316 &self,
317 buf: &mut [u8],
318 offset: u64,
319 ) -> impl Future<Output = Result<(), Error>> + Send;
320
321 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
323
324 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
326
327 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
329
330 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use commonware_macros::select;
338 use futures::channel::oneshot;
339 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
340 use prometheus_client::metrics::counter::Counter;
341 use std::collections::HashMap;
342 use std::panic::{catch_unwind, AssertUnwindSafe};
343 use std::str::FromStr;
344 use std::sync::Mutex;
345 use tracing::{error, Level};
346 use utils::reschedule;
347
348 fn test_error_future<R: Runner>(runner: R) {
349 async fn error_future() -> Result<&'static str, &'static str> {
350 Err("An error occurred")
351 }
352 let result = runner.start(|_| error_future());
353 assert_eq!(result, Err("An error occurred"));
354 }
355
356 fn test_clock_sleep<R: Runner>(runner: R)
357 where
358 R::Context: Spawner + Clock,
359 {
360 runner.start(|context| async move {
361 let start = context.current();
363 let sleep_duration = Duration::from_millis(10);
364 context.sleep(sleep_duration).await;
365
366 let end = context.current();
368 assert!(end.duration_since(start).unwrap() >= sleep_duration);
369 });
370 }
371
372 fn test_clock_sleep_until<R: Runner>(runner: R)
373 where
374 R::Context: Spawner + Clock,
375 {
376 runner.start(|context| async move {
377 let now = context.current();
379 context.sleep_until(now + Duration::from_millis(100)).await;
380
381 let elapsed = now.elapsed().unwrap();
383 assert!(elapsed >= Duration::from_millis(100));
384 });
385 }
386
387 fn test_root_finishes<R: Runner>(runner: R)
388 where
389 R::Context: Spawner,
390 {
391 runner.start(|context| async move {
392 context.spawn(|_| async move {
393 loop {
394 reschedule().await;
395 }
396 });
397 });
398 }
399
400 fn test_spawn_abort<R: Runner>(runner: R)
401 where
402 R::Context: Spawner,
403 {
404 runner.start(|context| async move {
405 let handle = context.spawn(|_| async move {
406 loop {
407 reschedule().await;
408 }
409 });
410 handle.abort();
411 assert!(matches!(handle.await, Err(Error::Closed)));
412 });
413 }
414
415 fn test_panic_aborts_root<R: Runner>(runner: R) {
416 let result = catch_unwind(AssertUnwindSafe(|| {
417 runner.start(|_| async move {
418 panic!("blah");
419 });
420 }));
421 result.unwrap_err();
422 }
423
424 fn test_panic_aborts_spawn<R: Runner>(runner: R)
425 where
426 R::Context: Spawner,
427 {
428 let result = runner.start(|context| async move {
429 let result = context.spawn(|_| async move {
430 panic!("blah");
431 });
432 assert!(matches!(result.await, Err(Error::Exited)));
433 Result::<(), Error>::Ok(())
434 });
435
436 result.unwrap();
438 }
439
440 fn test_select<R: Runner>(runner: R) {
441 runner.start(|_| async move {
442 let output = Mutex::new(0);
444 select! {
445 v1 = ready(1) => {
446 *output.lock().unwrap() = v1;
447 },
448 v2 = ready(2) => {
449 *output.lock().unwrap() = v2;
450 },
451 };
452 assert_eq!(*output.lock().unwrap(), 1);
453
454 select! {
456 v1 = std::future::pending::<i32>() => {
457 *output.lock().unwrap() = v1;
458 },
459 v2 = ready(2) => {
460 *output.lock().unwrap() = v2;
461 },
462 };
463 assert_eq!(*output.lock().unwrap(), 2);
464 });
465 }
466
467 fn test_select_loop<R: Runner>(runner: R)
469 where
470 R::Context: Clock,
471 {
472 runner.start(|context| async move {
473 let (mut sender, mut receiver) = mpsc::unbounded();
475 for _ in 0..2 {
476 select! {
477 v = receiver.next() => {
478 panic!("unexpected value: {:?}", v);
479 },
480 _ = context.sleep(Duration::from_millis(100)) => {
481 continue;
482 },
483 };
484 }
485
486 sender.send(0).await.unwrap();
488 sender.send(1).await.unwrap();
489
490 select! {
492 _ = async {} => {
493 },
495 v = receiver.next() => {
496 panic!("unexpected value: {:?}", v);
497 },
498 };
499
500 for i in 0..2 {
502 select! {
503 _ = context.sleep(Duration::from_millis(100)) => {
504 panic!("timeout");
505 },
506 v = receiver.next() => {
507 assert_eq!(v.unwrap(), i);
508 },
509 };
510 }
511 });
512 }
513
514 fn test_storage_operations<R: Runner>(runner: R)
515 where
516 R::Context: Storage,
517 {
518 runner.start(|context| async move {
519 let partition = "test_partition";
520 let name = b"test_blob";
521
522 let (blob, _) = context
524 .open(partition, name)
525 .await
526 .expect("Failed to open blob");
527
528 let data = b"Hello, Storage!";
530 blob.write_at(data, 0)
531 .await
532 .expect("Failed to write to blob");
533
534 blob.sync().await.expect("Failed to sync blob");
536
537 let mut buffer = vec![0u8; data.len()];
539 blob.read_at(&mut buffer, 0)
540 .await
541 .expect("Failed to read from blob");
542 assert_eq!(&buffer, data);
543
544 blob.close().await.expect("Failed to close blob");
546
547 let blobs = context
549 .scan(partition)
550 .await
551 .expect("Failed to scan partition");
552 assert!(blobs.contains(&name.to_vec()));
553
554 let (blob, len) = context
556 .open(partition, name)
557 .await
558 .expect("Failed to reopen blob");
559 assert_eq!(len, data.len() as u64);
560
561 let mut buffer = vec![0u8; 7];
563 blob.read_at(&mut buffer, 7)
564 .await
565 .expect("Failed to read data");
566 assert_eq!(&buffer, b"Storage");
567
568 blob.close().await.expect("Failed to close blob");
570
571 context
573 .remove(partition, Some(name))
574 .await
575 .expect("Failed to remove blob");
576
577 let blobs = context
579 .scan(partition)
580 .await
581 .expect("Failed to scan partition");
582 assert!(!blobs.contains(&name.to_vec()));
583
584 context
586 .remove(partition, None)
587 .await
588 .expect("Failed to remove partition");
589
590 let result = context.scan(partition).await;
592 assert!(matches!(result, Err(Error::PartitionMissing(_))));
593 });
594 }
595
596 fn test_blob_read_write<R: Runner>(runner: R)
597 where
598 R::Context: Storage,
599 {
600 runner.start(|context| async move {
601 let partition = "test_partition";
602 let name = b"test_blob_rw";
603
604 let (blob, _) = context
606 .open(partition, name)
607 .await
608 .expect("Failed to open blob");
609
610 let data1 = b"Hello";
612 let data2 = b"World";
613 blob.write_at(data1, 0)
614 .await
615 .expect("Failed to write data1");
616 blob.write_at(data2, 5)
617 .await
618 .expect("Failed to write data2");
619
620 let mut buffer = vec![0u8; 10];
622 blob.read_at(&mut buffer, 0)
623 .await
624 .expect("Failed to read data");
625 assert_eq!(&buffer[..5], data1);
626 assert_eq!(&buffer[5..], data2);
627
628 let data3 = b"Store";
630 blob.write_at(data3, 5)
631 .await
632 .expect("Failed to write data3");
633
634 blob.truncate(5).await.expect("Failed to truncate blob");
636 let mut buffer = vec![0u8; 5];
637 blob.read_at(&mut buffer, 0)
638 .await
639 .expect("Failed to read data");
640 assert_eq!(&buffer[..5], data1);
641
642 let mut buffer = vec![0u8; 10];
644 let result = blob.read_at(&mut buffer, 0).await;
645 assert!(result.is_err());
646
647 blob.close().await.expect("Failed to close blob");
649 });
650 }
651
652 fn test_many_partition_read_write<R: Runner>(runner: R)
653 where
654 R::Context: Storage,
655 {
656 runner.start(|context| async move {
657 let partitions = ["partition1", "partition2", "partition3"];
658 let name = b"test_blob_rw";
659 let data1 = b"Hello";
660 let data2 = b"World";
661
662 for (additional, partition) in partitions.iter().enumerate() {
663 let (blob, _) = context
665 .open(partition, name)
666 .await
667 .expect("Failed to open blob");
668
669 blob.write_at(data1, 0)
671 .await
672 .expect("Failed to write data1");
673 blob.write_at(data2, 5 + additional as u64)
674 .await
675 .expect("Failed to write data2");
676
677 blob.close().await.expect("Failed to close blob");
679 }
680
681 for (additional, partition) in partitions.iter().enumerate() {
682 let (blob, len) = context
684 .open(partition, name)
685 .await
686 .expect("Failed to open blob");
687 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
688
689 let mut buffer = vec![0u8; 10 + additional];
691 blob.read_at(&mut buffer, 0)
692 .await
693 .expect("Failed to read data");
694 assert_eq!(&buffer[..5], b"Hello");
695 assert_eq!(&buffer[5 + additional..], b"World");
696
697 blob.close().await.expect("Failed to close blob");
699 }
700 });
701 }
702
703 fn test_blob_read_past_length<R: Runner>(runner: R)
704 where
705 R::Context: Storage,
706 {
707 runner.start(|context| async move {
708 let partition = "test_partition";
709 let name = b"test_blob_rw";
710
711 let (blob, _) = context
713 .open(partition, name)
714 .await
715 .expect("Failed to open blob");
716
717 let mut buffer = vec![0u8; 10];
719 let result = blob.read_at(&mut buffer, 0).await;
720 assert!(result.is_err());
721
722 let data = b"Hello, Storage!";
724 blob.write_at(data, 0)
725 .await
726 .expect("Failed to write to blob");
727
728 let mut buffer = vec![0u8; 20];
730 let result = blob.read_at(&mut buffer, 0).await;
731 assert!(result.is_err());
732 })
733 }
734
735 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
736 where
737 R::Context: Spawner + Storage + Metrics,
738 {
739 runner.start(|context| async move {
740 let partition = "test_partition";
741 let name = b"test_blob_rw";
742
743 let (blob, _) = context
745 .open(partition, name)
746 .await
747 .expect("Failed to open blob");
748
749 let data = b"Hello, Storage!";
751 blob.write_at(data, 0)
752 .await
753 .expect("Failed to write to blob");
754
755 blob.sync().await.expect("Failed to sync blob");
757
758 let check1 = context.with_label("check1").spawn({
760 let blob = blob.clone();
761 move |_| async move {
762 let mut buffer = vec![0u8; data.len()];
763 blob.read_at(&mut buffer, 0)
764 .await
765 .expect("Failed to read from blob");
766 assert_eq!(&buffer, data);
767 }
768 });
769 let check2 = context.with_label("check2").spawn({
770 let blob = blob.clone();
771 move |_| async move {
772 let mut buffer = vec![0u8; data.len()];
773 blob.read_at(&mut buffer, 0)
774 .await
775 .expect("Failed to read from blob");
776 assert_eq!(&buffer, data);
777 }
778 });
779
780 let result = join!(check1, check2);
782 assert!(result.0.is_ok());
783 assert!(result.1.is_ok());
784
785 let mut buffer = vec![0u8; data.len()];
787 blob.read_at(&mut buffer, 0)
788 .await
789 .expect("Failed to read from blob");
790 assert_eq!(&buffer, data);
791
792 blob.close().await.expect("Failed to close blob");
794
795 let buffer = context.encode();
797 assert!(buffer.contains("open_blobs 0"));
798 });
799 }
800
801 fn test_shutdown<R: Runner>(runner: R)
802 where
803 R::Context: Spawner + Metrics + Clock,
804 {
805 let kill = 9;
806 runner.start(|context| async move {
807 let before = context
809 .with_label("before")
810 .spawn(move |context| async move {
811 let sig = context.stopped().await;
812 assert_eq!(sig.unwrap(), kill);
813 });
814
815 let after = context
817 .with_label("after")
818 .spawn(move |context| async move {
819 let mut signal = context.stopped();
821 loop {
822 select! {
823 sig = &mut signal => {
824 assert_eq!(sig.unwrap(), kill);
826 break;
827 },
828 _ = context.sleep(Duration::from_millis(10)) => {
829 },
831 }
832 }
833 });
834
835 context.sleep(Duration::from_millis(50)).await;
837
838 context.stop(kill);
840
841 let result = join!(before, after);
843 assert!(result.0.is_ok());
844 assert!(result.1.is_ok());
845 });
846 }
847
848 fn test_spawn_ref<R: Runner>(runner: R)
849 where
850 R::Context: Spawner,
851 {
852 runner.start(|mut context| async move {
853 let handle = context.spawn_ref();
854 let result = handle(async move { 42 }).await;
855 assert!(matches!(result, Ok(42)));
856 });
857 }
858
859 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
860 where
861 R::Context: Spawner,
862 {
863 runner.start(|mut context| async move {
864 let handle = context.spawn_ref();
865 let result = handle(async move { 42 }).await;
866 assert!(matches!(result, Ok(42)));
867
868 let handle = context.spawn_ref();
870 let result = handle(async move { 42 }).await;
871 assert!(matches!(result, Ok(42)));
872 });
873 }
874
875 fn test_spawn_duplicate<R: Runner>(runner: R)
876 where
877 R::Context: Spawner,
878 {
879 runner.start(|mut context| async move {
880 let handle = context.spawn_ref();
881 let result = handle(async move { 42 }).await;
882 assert!(matches!(result, Ok(42)));
883
884 context.spawn(|_| async move { 42 });
886 });
887 }
888
889 fn test_spawn_blocking<R: Runner>(runner: R)
890 where
891 R::Context: Spawner,
892 {
893 runner.start(|context| async move {
894 let handle = context.spawn_blocking(|| 42);
895 let result = handle.await;
896 assert!(matches!(result, Ok(42)));
897 });
898 }
899
900 fn test_spawn_blocking_abort<R: Runner>(runner: R)
901 where
902 R::Context: Spawner,
903 {
904 runner.start(|context| async move {
905 let (sender, mut receiver) = oneshot::channel();
907 let handle = context.spawn_blocking(move || {
908 loop {
910 if receiver.try_recv().is_ok() {
911 break;
912 }
913 }
914
915 let mut count = 0;
917 loop {
918 count += 1;
919 if count >= 100_000_000 {
920 break;
921 }
922 }
923 count
924 });
925
926 handle.abort();
932 sender.send(()).unwrap();
933
934 assert!(matches!(handle.await, Ok(100_000_000)));
936 });
937 }
938
939 fn test_metrics<R: Runner>(runner: R)
940 where
941 R::Context: Metrics,
942 {
943 runner.start(|context| async move {
944 assert_eq!(context.label(), "");
946
947 let counter = Counter::<u64>::default();
949 context.register("test", "test", counter.clone());
950
951 counter.inc();
953
954 let buffer = context.encode();
956 assert!(buffer.contains("test_total 1"));
957
958 let context = context.with_label("nested");
960 let nested_counter = Counter::<u64>::default();
961 context.register("test", "test", nested_counter.clone());
962
963 nested_counter.inc();
965
966 let buffer = context.encode();
968 assert!(buffer.contains("nested_test_total 1"));
969 assert!(buffer.contains("test_total 1"));
970 });
971 }
972
973 fn test_metrics_label<R: Runner>(runner: R)
974 where
975 R::Context: Metrics,
976 {
977 runner.start(|context| async move {
978 context.with_label(METRICS_PREFIX);
979 })
980 }
981
982 #[test]
983 fn test_deterministic_future() {
984 let runner = deterministic::Runner::default();
985 test_error_future(runner);
986 }
987
988 #[test]
989 fn test_deterministic_clock_sleep() {
990 let executor = deterministic::Runner::default();
991 test_clock_sleep(executor);
992 }
993
994 #[test]
995 fn test_deterministic_clock_sleep_until() {
996 let executor = deterministic::Runner::default();
997 test_clock_sleep_until(executor);
998 }
999
1000 #[test]
1001 fn test_deterministic_root_finishes() {
1002 let executor = deterministic::Runner::default();
1003 test_root_finishes(executor);
1004 }
1005
1006 #[test]
1007 fn test_deterministic_spawn_abort() {
1008 let executor = deterministic::Runner::default();
1009 test_spawn_abort(executor);
1010 }
1011
1012 #[test]
1013 fn test_deterministic_panic_aborts_root() {
1014 let runner = deterministic::Runner::default();
1015 test_panic_aborts_root(runner);
1016 }
1017
1018 #[test]
1019 #[should_panic(expected = "blah")]
1020 fn test_deterministic_panic_aborts_spawn() {
1021 let executor = deterministic::Runner::default();
1022 test_panic_aborts_spawn(executor);
1023 }
1024
1025 #[test]
1026 fn test_deterministic_select() {
1027 let executor = deterministic::Runner::default();
1028 test_select(executor);
1029 }
1030
1031 #[test]
1032 fn test_deterministic_select_loop() {
1033 let executor = deterministic::Runner::default();
1034 test_select_loop(executor);
1035 }
1036
1037 #[test]
1038 fn test_deterministic_storage_operations() {
1039 let executor = deterministic::Runner::default();
1040 test_storage_operations(executor);
1041 }
1042
1043 #[test]
1044 fn test_deterministic_blob_read_write() {
1045 let executor = deterministic::Runner::default();
1046 test_blob_read_write(executor);
1047 }
1048
1049 #[test]
1050 fn test_deterministic_many_partition_read_write() {
1051 let executor = deterministic::Runner::default();
1052 test_many_partition_read_write(executor);
1053 }
1054
1055 #[test]
1056 fn test_deterministic_blob_read_past_length() {
1057 let executor = deterministic::Runner::default();
1058 test_blob_read_past_length(executor);
1059 }
1060
1061 #[test]
1062 fn test_deterministic_blob_clone_and_concurrent_read() {
1063 let executor = deterministic::Runner::default();
1065 test_blob_clone_and_concurrent_read(executor);
1066 }
1067
1068 #[test]
1069 fn test_deterministic_shutdown() {
1070 let executor = deterministic::Runner::default();
1071 test_shutdown(executor);
1072 }
1073
1074 #[test]
1075 fn test_deterministic_spawn_ref() {
1076 let executor = deterministic::Runner::default();
1077 test_spawn_ref(executor);
1078 }
1079
1080 #[test]
1081 #[should_panic]
1082 fn test_deterministic_spawn_ref_duplicate() {
1083 let executor = deterministic::Runner::default();
1084 test_spawn_ref_duplicate(executor);
1085 }
1086
1087 #[test]
1088 #[should_panic]
1089 fn test_deterministic_spawn_duplicate() {
1090 let executor = deterministic::Runner::default();
1091 test_spawn_duplicate(executor);
1092 }
1093
1094 #[test]
1095 fn test_deterministic_spawn_blocking() {
1096 let executor = deterministic::Runner::default();
1097 test_spawn_blocking(executor);
1098 }
1099
1100 #[test]
1101 #[should_panic(expected = "blocking task panicked")]
1102 fn test_deterministic_spawn_blocking_panic() {
1103 let executor = deterministic::Runner::default();
1104 executor.start(|context| async move {
1105 let handle = context.spawn_blocking(|| {
1106 panic!("blocking task panicked");
1107 });
1108 handle.await.unwrap();
1109 });
1110 }
1111
1112 #[test]
1113 fn test_deterministic_spawn_blocking_abort() {
1114 let executor = deterministic::Runner::default();
1115 test_spawn_blocking_abort(executor);
1116 }
1117
1118 #[test]
1119 fn test_deterministic_metrics() {
1120 let executor = deterministic::Runner::default();
1121 test_metrics(executor);
1122 }
1123
1124 #[test]
1125 #[should_panic]
1126 fn test_deterministic_metrics_label() {
1127 let executor = deterministic::Runner::default();
1128 test_metrics_label(executor);
1129 }
1130
1131 #[test]
1132 fn test_tokio_error_future() {
1133 let runner = tokio::Runner::default();
1134 test_error_future(runner);
1135 }
1136
1137 #[test]
1138 fn test_tokio_clock_sleep() {
1139 let executor = tokio::Runner::default();
1140 test_clock_sleep(executor);
1141 }
1142
1143 #[test]
1144 fn test_tokio_clock_sleep_until() {
1145 let executor = tokio::Runner::default();
1146 test_clock_sleep_until(executor);
1147 }
1148
1149 #[test]
1150 fn test_tokio_root_finishes() {
1151 let executor = tokio::Runner::default();
1152 test_root_finishes(executor);
1153 }
1154
1155 #[test]
1156 fn test_tokio_spawn_abort() {
1157 let executor = tokio::Runner::default();
1158 test_spawn_abort(executor);
1159 }
1160
1161 #[test]
1162 fn test_tokio_panic_aborts_root() {
1163 let executor = tokio::Runner::default();
1164 test_panic_aborts_root(executor);
1165 }
1166
1167 #[test]
1168 fn test_tokio_panic_aborts_spawn() {
1169 let executor = tokio::Runner::default();
1170 test_panic_aborts_spawn(executor);
1171 }
1172
1173 #[test]
1174 fn test_tokio_select() {
1175 let executor = tokio::Runner::default();
1176 test_select(executor);
1177 }
1178
1179 #[test]
1180 fn test_tokio_select_loop() {
1181 let executor = tokio::Runner::default();
1182 test_select_loop(executor);
1183 }
1184
1185 #[test]
1186 fn test_tokio_storage_operations() {
1187 let executor = tokio::Runner::default();
1188 test_storage_operations(executor);
1189 }
1190
1191 #[test]
1192 fn test_tokio_blob_read_write() {
1193 let executor = tokio::Runner::default();
1194 test_blob_read_write(executor);
1195 }
1196
1197 #[test]
1198 fn test_tokio_many_partition_read_write() {
1199 let executor = tokio::Runner::default();
1200 test_many_partition_read_write(executor);
1201 }
1202
1203 #[test]
1204 fn test_tokio_blob_read_past_length() {
1205 let executor = tokio::Runner::default();
1206 test_blob_read_past_length(executor);
1207 }
1208
1209 #[test]
1210 fn test_tokio_blob_clone_and_concurrent_read() {
1211 let executor = tokio::Runner::default();
1213 test_blob_clone_and_concurrent_read(executor);
1214 }
1215
1216 #[test]
1217 fn test_tokio_shutdown() {
1218 let executor = tokio::Runner::default();
1219 test_shutdown(executor);
1220 }
1221
1222 #[test]
1223 fn test_tokio_spawn_ref() {
1224 let executor = tokio::Runner::default();
1225 test_spawn_ref(executor);
1226 }
1227
1228 #[test]
1229 #[should_panic]
1230 fn test_tokio_spawn_ref_duplicate() {
1231 let executor = tokio::Runner::default();
1232 test_spawn_ref_duplicate(executor);
1233 }
1234
1235 #[test]
1236 #[should_panic]
1237 fn test_tokio_spawn_duplicate() {
1238 let executor = tokio::Runner::default();
1239 test_spawn_duplicate(executor);
1240 }
1241
1242 #[test]
1243 fn test_tokio_spawn_blocking() {
1244 let executor = tokio::Runner::default();
1245 test_spawn_blocking(executor);
1246 }
1247
1248 #[test]
1249 fn test_tokio_spawn_blocking_panic() {
1250 let executor = tokio::Runner::default();
1251 executor.start(|context| async move {
1252 let handle = context.spawn_blocking(|| {
1253 panic!("blocking task panicked");
1254 });
1255 let result = handle.await;
1256 assert!(matches!(result, Err(Error::Exited)));
1257 });
1258 }
1259
1260 #[test]
1261 fn test_tokio_spawn_blocking_abort() {
1262 let executor = tokio::Runner::default();
1263 test_spawn_blocking_abort(executor);
1264 }
1265
1266 #[test]
1267 fn test_tokio_metrics() {
1268 let executor = tokio::Runner::default();
1269 test_metrics(executor);
1270 }
1271
1272 #[test]
1273 #[should_panic]
1274 fn test_tokio_metrics_label() {
1275 let executor = tokio::Runner::default();
1276 test_metrics_label(executor);
1277 }
1278
1279 #[test]
1280 fn test_tokio_telemetry() {
1281 let executor = tokio::Runner::default();
1282 executor.start(|context| async move {
1283 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
1285
1286 tokio::telemetry::init(
1288 context.with_label("metrics"),
1289 Level::INFO,
1290 Some(address),
1291 None,
1292 );
1293
1294 let counter: Counter<u64> = Counter::default();
1296 context.register("test_counter", "Test counter", counter.clone());
1297 counter.inc();
1298
1299 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1301 let mut line = Vec::new();
1302 loop {
1303 let mut byte = [0; 1];
1304 stream.recv(&mut byte).await?;
1305 if byte[0] == b'\n' {
1306 if line.last() == Some(&b'\r') {
1307 line.pop(); }
1309 break;
1310 }
1311 line.push(byte[0]);
1312 }
1313 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1314 }
1315
1316 async fn read_headers<St: Stream>(
1317 stream: &mut St,
1318 ) -> Result<HashMap<String, String>, Error> {
1319 let mut headers = HashMap::new();
1320 loop {
1321 let line = read_line(stream).await?;
1322 if line.is_empty() {
1323 break;
1324 }
1325 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1326 if parts.len() == 2 {
1327 headers.insert(parts[0].to_string(), parts[1].to_string());
1328 }
1329 }
1330 Ok(headers)
1331 }
1332
1333 async fn read_body<St: Stream>(
1334 stream: &mut St,
1335 content_length: usize,
1336 ) -> Result<String, Error> {
1337 let mut body = vec![0; content_length];
1338 stream.recv(&mut body).await?;
1339 String::from_utf8(body).map_err(|_| Error::ReadFailed)
1340 }
1341
1342 let client_handle = context
1344 .with_label("client")
1345 .spawn(move |context| async move {
1346 let (mut sink, mut stream) = loop {
1347 match context.dial(address).await {
1348 Ok((sink, stream)) => break (sink, stream),
1349 Err(e) => {
1350 error!(err =?e, "failed to connect");
1352 context.sleep(Duration::from_millis(10)).await;
1353 }
1354 }
1355 };
1356
1357 let request = format!(
1359 "GET /metrics HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
1360 address
1361 );
1362 sink.send(request.as_bytes()).await.unwrap();
1363
1364 let status_line = read_line(&mut stream).await.unwrap();
1366 assert_eq!(status_line, "HTTP/1.1 200 OK");
1367
1368 let headers = read_headers(&mut stream).await.unwrap();
1370 println!("Headers: {:?}", headers);
1371 let content_length = headers
1372 .get("content-length")
1373 .unwrap()
1374 .parse::<usize>()
1375 .unwrap();
1376
1377 let body = read_body(&mut stream, content_length).await.unwrap();
1379 assert!(body.contains("test_counter_total 1"));
1380 });
1381
1382 client_handle.await.unwrap();
1384 });
1385 }
1386}