1use prometheus_client::registry::Metric;
21use std::io::Error as IoError;
22use std::{
23 future::Future,
24 net::SocketAddr,
25 time::{Duration, SystemTime},
26};
27use thiserror::Error;
28
29pub mod deterministic;
30pub mod mocks;
31cfg_if::cfg_if! {
32 if #[cfg(not(target_arch = "wasm32"))] {
33 pub mod tokio;
34 pub mod benchmarks;
35 }
36}
37mod storage;
38pub mod telemetry;
39mod utils;
40pub use utils::{
41 create_pool, reschedule, Handle, RwLock, RwLockReadGuard, RwLockWriteGuard, Signal, Signaler,
42};
43
44const METRICS_PREFIX: &str = "runtime";
46
47#[derive(Error, Debug)]
49pub enum Error {
50 #[error("exited")]
51 Exited,
52 #[error("closed")]
53 Closed,
54 #[error("timeout")]
55 Timeout,
56 #[error("bind failed")]
57 BindFailed,
58 #[error("connection failed")]
59 ConnectionFailed,
60 #[error("write failed")]
61 WriteFailed,
62 #[error("read failed")]
63 ReadFailed,
64 #[error("send failed")]
65 SendFailed,
66 #[error("recv failed")]
67 RecvFailed,
68 #[error("partition creation failed: {0}")]
69 PartitionCreationFailed(String),
70 #[error("partition missing: {0}")]
71 PartitionMissing(String),
72 #[error("partition corrupt: {0}")]
73 PartitionCorrupt(String),
74 #[error("blob open failed: {0}/{1} error: {2}")]
75 BlobOpenFailed(String, String, IoError),
76 #[error("blob missing: {0}/{1}")]
77 BlobMissing(String, String),
78 #[error("blob truncate failed: {0}/{1} error: {2}")]
79 BlobTruncateFailed(String, String, IoError),
80 #[error("blob sync failed: {0}/{1} error: {2}")]
81 BlobSyncFailed(String, String, IoError),
82 #[error("blob close failed: {0}/{1} error: {2}")]
83 BlobCloseFailed(String, String, IoError),
84 #[error("blob insufficient length")]
85 BlobInsufficientLength,
86 #[error("offset overflow")]
87 OffsetOverflow,
88}
89
90pub trait Runner {
93 type Context;
99
100 fn start<F, Fut>(self, f: F) -> Fut::Output
102 where
103 F: FnOnce(Self::Context) -> Fut,
104 Fut: Future;
105}
106
107pub trait Spawner: Clone + Send + Sync + 'static {
109 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
117 where
118 F: FnOnce(Self) -> Fut + Send + 'static,
119 Fut: Future<Output = T> + Send + 'static,
120 T: Send + 'static;
121
122 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
135 where
136 F: Future<Output = T> + Send + 'static,
137 T: Send + 'static;
138
139 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
149 where
150 F: FnOnce() -> T + Send + 'static,
151 T: Send + 'static;
152
153 fn stop(&self, value: i32);
160
161 fn stopped(&self) -> Signal;
166}
167
168pub trait Metrics: Clone + Send + Sync + 'static {
170 fn label(&self) -> String;
172
173 fn with_label(&self, label: &str) -> Self;
181
182 fn scoped_label(&self, label: &str) -> String {
186 let label = if self.label().is_empty() {
187 label.to_string()
188 } else {
189 format!("{}_{}", self.label(), label)
190 };
191 assert!(
192 !label.starts_with(METRICS_PREFIX),
193 "using runtime label is not allowed"
194 );
195 label
196 }
197
198 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
202
203 fn encode(&self) -> String;
205}
206
207pub trait Clock: Clone + Send + Sync + 'static {
213 fn current(&self) -> SystemTime;
215
216 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
218
219 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
221}
222
223pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
226where
227 L: Listener<Si, St>,
228 Si: Sink,
229 St: Stream,
230{
231 fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
233
234 fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
236}
237
238pub trait Listener<Si, St>: Sync + Send + 'static
241where
242 Si: Sink,
243 St: Stream,
244{
245 fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
247}
248
249pub trait Sink: Sync + Send + 'static {
252 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256pub trait Stream: Sync + Send + 'static {
259 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
262}
263
264pub trait Storage: Clone + Send + Sync + 'static {
272 type Blob: Blob;
274
275 fn open(
281 &self,
282 partition: &str,
283 name: &[u8],
284 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
285
286 fn remove(
290 &self,
291 partition: &str,
292 name: Option<&[u8]>,
293 ) -> impl Future<Output = Result<(), Error>> + Send;
294
295 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
297}
298
299#[allow(clippy::len_without_is_empty)]
310pub trait Blob: Clone + Send + Sync + 'static {
311 fn read_at(
316 &self,
317 buf: &mut [u8],
318 offset: u64,
319 ) -> impl Future<Output = Result<(), Error>> + Send;
320
321 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
323
324 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
326
327 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
329
330 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use commonware_macros::select;
338 use futures::channel::oneshot;
339 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
340 use prometheus_client::metrics::counter::Counter;
341 use std::collections::HashMap;
342 use std::panic::{catch_unwind, AssertUnwindSafe};
343 use std::str::FromStr;
344 use std::sync::Mutex;
345 use telemetry::metrics;
346 use tracing::error;
347 use utils::reschedule;
348
349 fn test_error_future<R: Runner>(runner: R) {
350 async fn error_future() -> Result<&'static str, &'static str> {
351 Err("An error occurred")
352 }
353 let result = runner.start(|_| error_future());
354 assert_eq!(result, Err("An error occurred"));
355 }
356
357 fn test_clock_sleep<R: Runner>(runner: R)
358 where
359 R::Context: Spawner + Clock,
360 {
361 runner.start(|context| async move {
362 let start = context.current();
364 let sleep_duration = Duration::from_millis(10);
365 context.sleep(sleep_duration).await;
366
367 let end = context.current();
369 assert!(end.duration_since(start).unwrap() >= sleep_duration);
370 });
371 }
372
373 fn test_clock_sleep_until<R: Runner>(runner: R)
374 where
375 R::Context: Spawner + Clock,
376 {
377 runner.start(|context| async move {
378 let now = context.current();
380 context.sleep_until(now + Duration::from_millis(100)).await;
381
382 let elapsed = now.elapsed().unwrap();
384 assert!(elapsed >= Duration::from_millis(100));
385 });
386 }
387
388 fn test_root_finishes<R: Runner>(runner: R)
389 where
390 R::Context: Spawner,
391 {
392 runner.start(|context| async move {
393 context.spawn(|_| async move {
394 loop {
395 reschedule().await;
396 }
397 });
398 });
399 }
400
401 fn test_spawn_abort<R: Runner>(runner: R)
402 where
403 R::Context: Spawner,
404 {
405 runner.start(|context| async move {
406 let handle = context.spawn(|_| async move {
407 loop {
408 reschedule().await;
409 }
410 });
411 handle.abort();
412 assert!(matches!(handle.await, Err(Error::Closed)));
413 });
414 }
415
416 fn test_panic_aborts_root<R: Runner>(runner: R) {
417 let result = catch_unwind(AssertUnwindSafe(|| {
418 runner.start(|_| async move {
419 panic!("blah");
420 });
421 }));
422 result.unwrap_err();
423 }
424
425 fn test_panic_aborts_spawn<R: Runner>(runner: R)
426 where
427 R::Context: Spawner,
428 {
429 let result = runner.start(|context| async move {
430 let result = context.spawn(|_| async move {
431 panic!("blah");
432 });
433 assert!(matches!(result.await, Err(Error::Exited)));
434 Result::<(), Error>::Ok(())
435 });
436
437 result.unwrap();
439 }
440
441 fn test_select<R: Runner>(runner: R) {
442 runner.start(|_| async move {
443 let output = Mutex::new(0);
445 select! {
446 v1 = ready(1) => {
447 *output.lock().unwrap() = v1;
448 },
449 v2 = ready(2) => {
450 *output.lock().unwrap() = v2;
451 },
452 };
453 assert_eq!(*output.lock().unwrap(), 1);
454
455 select! {
457 v1 = std::future::pending::<i32>() => {
458 *output.lock().unwrap() = v1;
459 },
460 v2 = ready(2) => {
461 *output.lock().unwrap() = v2;
462 },
463 };
464 assert_eq!(*output.lock().unwrap(), 2);
465 });
466 }
467
468 fn test_select_loop<R: Runner>(runner: R)
470 where
471 R::Context: Clock,
472 {
473 runner.start(|context| async move {
474 let (mut sender, mut receiver) = mpsc::unbounded();
476 for _ in 0..2 {
477 select! {
478 v = receiver.next() => {
479 panic!("unexpected value: {:?}", v);
480 },
481 _ = context.sleep(Duration::from_millis(100)) => {
482 continue;
483 },
484 };
485 }
486
487 sender.send(0).await.unwrap();
489 sender.send(1).await.unwrap();
490
491 select! {
493 _ = async {} => {
494 },
496 v = receiver.next() => {
497 panic!("unexpected value: {:?}", v);
498 },
499 };
500
501 for i in 0..2 {
503 select! {
504 _ = context.sleep(Duration::from_millis(100)) => {
505 panic!("timeout");
506 },
507 v = receiver.next() => {
508 assert_eq!(v.unwrap(), i);
509 },
510 };
511 }
512 });
513 }
514
515 fn test_storage_operations<R: Runner>(runner: R)
516 where
517 R::Context: Storage,
518 {
519 runner.start(|context| async move {
520 let partition = "test_partition";
521 let name = b"test_blob";
522
523 let (blob, _) = context
525 .open(partition, name)
526 .await
527 .expect("Failed to open blob");
528
529 let data = b"Hello, Storage!";
531 blob.write_at(data, 0)
532 .await
533 .expect("Failed to write to blob");
534
535 blob.sync().await.expect("Failed to sync blob");
537
538 let mut buffer = vec![0u8; data.len()];
540 blob.read_at(&mut buffer, 0)
541 .await
542 .expect("Failed to read from blob");
543 assert_eq!(&buffer, data);
544
545 blob.close().await.expect("Failed to close blob");
547
548 let blobs = context
550 .scan(partition)
551 .await
552 .expect("Failed to scan partition");
553 assert!(blobs.contains(&name.to_vec()));
554
555 let (blob, len) = context
557 .open(partition, name)
558 .await
559 .expect("Failed to reopen blob");
560 assert_eq!(len, data.len() as u64);
561
562 let mut buffer = vec![0u8; 7];
564 blob.read_at(&mut buffer, 7)
565 .await
566 .expect("Failed to read data");
567 assert_eq!(&buffer, b"Storage");
568
569 blob.close().await.expect("Failed to close blob");
571
572 context
574 .remove(partition, Some(name))
575 .await
576 .expect("Failed to remove blob");
577
578 let blobs = context
580 .scan(partition)
581 .await
582 .expect("Failed to scan partition");
583 assert!(!blobs.contains(&name.to_vec()));
584
585 context
587 .remove(partition, None)
588 .await
589 .expect("Failed to remove partition");
590
591 let result = context.scan(partition).await;
593 assert!(matches!(result, Err(Error::PartitionMissing(_))));
594 });
595 }
596
597 fn test_blob_read_write<R: Runner>(runner: R)
598 where
599 R::Context: Storage,
600 {
601 runner.start(|context| async move {
602 let partition = "test_partition";
603 let name = b"test_blob_rw";
604
605 let (blob, _) = context
607 .open(partition, name)
608 .await
609 .expect("Failed to open blob");
610
611 let data1 = b"Hello";
613 let data2 = b"World";
614 blob.write_at(data1, 0)
615 .await
616 .expect("Failed to write data1");
617 blob.write_at(data2, 5)
618 .await
619 .expect("Failed to write data2");
620
621 let mut buffer = vec![0u8; 10];
623 blob.read_at(&mut buffer, 0)
624 .await
625 .expect("Failed to read data");
626 assert_eq!(&buffer[..5], data1);
627 assert_eq!(&buffer[5..], data2);
628
629 let data3 = b"Store";
631 blob.write_at(data3, 5)
632 .await
633 .expect("Failed to write data3");
634
635 blob.truncate(5).await.expect("Failed to truncate blob");
637 let mut buffer = vec![0u8; 5];
638 blob.read_at(&mut buffer, 0)
639 .await
640 .expect("Failed to read data");
641 assert_eq!(&buffer[..5], data1);
642
643 let mut buffer = vec![0u8; 10];
645 let result = blob.read_at(&mut buffer, 0).await;
646 assert!(result.is_err());
647
648 blob.close().await.expect("Failed to close blob");
650 });
651 }
652
653 fn test_many_partition_read_write<R: Runner>(runner: R)
654 where
655 R::Context: Storage,
656 {
657 runner.start(|context| async move {
658 let partitions = ["partition1", "partition2", "partition3"];
659 let name = b"test_blob_rw";
660 let data1 = b"Hello";
661 let data2 = b"World";
662
663 for (additional, partition) in partitions.iter().enumerate() {
664 let (blob, _) = context
666 .open(partition, name)
667 .await
668 .expect("Failed to open blob");
669
670 blob.write_at(data1, 0)
672 .await
673 .expect("Failed to write data1");
674 blob.write_at(data2, 5 + additional as u64)
675 .await
676 .expect("Failed to write data2");
677
678 blob.close().await.expect("Failed to close blob");
680 }
681
682 for (additional, partition) in partitions.iter().enumerate() {
683 let (blob, len) = context
685 .open(partition, name)
686 .await
687 .expect("Failed to open blob");
688 assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
689
690 let mut buffer = vec![0u8; 10 + additional];
692 blob.read_at(&mut buffer, 0)
693 .await
694 .expect("Failed to read data");
695 assert_eq!(&buffer[..5], b"Hello");
696 assert_eq!(&buffer[5 + additional..], b"World");
697
698 blob.close().await.expect("Failed to close blob");
700 }
701 });
702 }
703
704 fn test_blob_read_past_length<R: Runner>(runner: R)
705 where
706 R::Context: Storage,
707 {
708 runner.start(|context| async move {
709 let partition = "test_partition";
710 let name = b"test_blob_rw";
711
712 let (blob, _) = context
714 .open(partition, name)
715 .await
716 .expect("Failed to open blob");
717
718 let mut buffer = vec![0u8; 10];
720 let result = blob.read_at(&mut buffer, 0).await;
721 assert!(result.is_err());
722
723 let data = b"Hello, Storage!";
725 blob.write_at(data, 0)
726 .await
727 .expect("Failed to write to blob");
728
729 let mut buffer = vec![0u8; 20];
731 let result = blob.read_at(&mut buffer, 0).await;
732 assert!(result.is_err());
733 })
734 }
735
736 fn test_blob_clone_and_concurrent_read<R: Runner>(runner: R)
737 where
738 R::Context: Spawner + Storage + Metrics,
739 {
740 runner.start(|context| async move {
741 let partition = "test_partition";
742 let name = b"test_blob_rw";
743
744 let (blob, _) = context
746 .open(partition, name)
747 .await
748 .expect("Failed to open blob");
749
750 let data = b"Hello, Storage!";
752 blob.write_at(data, 0)
753 .await
754 .expect("Failed to write to blob");
755
756 blob.sync().await.expect("Failed to sync blob");
758
759 let check1 = context.with_label("check1").spawn({
761 let blob = blob.clone();
762 move |_| async move {
763 let mut buffer = vec![0u8; data.len()];
764 blob.read_at(&mut buffer, 0)
765 .await
766 .expect("Failed to read from blob");
767 assert_eq!(&buffer, data);
768 }
769 });
770 let check2 = context.with_label("check2").spawn({
771 let blob = blob.clone();
772 move |_| async move {
773 let mut buffer = vec![0u8; data.len()];
774 blob.read_at(&mut buffer, 0)
775 .await
776 .expect("Failed to read from blob");
777 assert_eq!(&buffer, data);
778 }
779 });
780
781 let result = join!(check1, check2);
783 assert!(result.0.is_ok());
784 assert!(result.1.is_ok());
785
786 let mut buffer = vec![0u8; data.len()];
788 blob.read_at(&mut buffer, 0)
789 .await
790 .expect("Failed to read from blob");
791 assert_eq!(&buffer, data);
792
793 blob.close().await.expect("Failed to close blob");
795
796 let buffer = context.encode();
798 assert!(buffer.contains("open_blobs 0"));
799 });
800 }
801
802 fn test_shutdown<R: Runner>(runner: R)
803 where
804 R::Context: Spawner + Metrics + Clock,
805 {
806 let kill = 9;
807 runner.start(|context| async move {
808 let before = context
810 .with_label("before")
811 .spawn(move |context| async move {
812 let sig = context.stopped().await;
813 assert_eq!(sig.unwrap(), kill);
814 });
815
816 let after = context
818 .with_label("after")
819 .spawn(move |context| async move {
820 let mut signal = context.stopped();
822 loop {
823 select! {
824 sig = &mut signal => {
825 assert_eq!(sig.unwrap(), kill);
827 break;
828 },
829 _ = context.sleep(Duration::from_millis(10)) => {
830 },
832 }
833 }
834 });
835
836 context.sleep(Duration::from_millis(50)).await;
838
839 context.stop(kill);
841
842 let result = join!(before, after);
844 assert!(result.0.is_ok());
845 assert!(result.1.is_ok());
846 });
847 }
848
849 fn test_spawn_ref<R: Runner>(runner: R)
850 where
851 R::Context: Spawner,
852 {
853 runner.start(|mut context| async move {
854 let handle = context.spawn_ref();
855 let result = handle(async move { 42 }).await;
856 assert!(matches!(result, Ok(42)));
857 });
858 }
859
860 fn test_spawn_ref_duplicate<R: Runner>(runner: R)
861 where
862 R::Context: Spawner,
863 {
864 runner.start(|mut context| async move {
865 let handle = context.spawn_ref();
866 let result = handle(async move { 42 }).await;
867 assert!(matches!(result, Ok(42)));
868
869 let handle = context.spawn_ref();
871 let result = handle(async move { 42 }).await;
872 assert!(matches!(result, Ok(42)));
873 });
874 }
875
876 fn test_spawn_duplicate<R: Runner>(runner: R)
877 where
878 R::Context: Spawner,
879 {
880 runner.start(|mut context| async move {
881 let handle = context.spawn_ref();
882 let result = handle(async move { 42 }).await;
883 assert!(matches!(result, Ok(42)));
884
885 context.spawn(|_| async move { 42 });
887 });
888 }
889
890 fn test_spawn_blocking<R: Runner>(runner: R)
891 where
892 R::Context: Spawner,
893 {
894 runner.start(|context| async move {
895 let handle = context.spawn_blocking(|| 42);
896 let result = handle.await;
897 assert!(matches!(result, Ok(42)));
898 });
899 }
900
901 fn test_spawn_blocking_abort<R: Runner>(runner: R)
902 where
903 R::Context: Spawner,
904 {
905 runner.start(|context| async move {
906 let (sender, mut receiver) = oneshot::channel();
908 let handle = context.spawn_blocking(move || {
909 loop {
911 if receiver.try_recv().is_ok() {
912 break;
913 }
914 }
915
916 let mut count = 0;
918 loop {
919 count += 1;
920 if count >= 100_000_000 {
921 break;
922 }
923 }
924 count
925 });
926
927 handle.abort();
933 sender.send(()).unwrap();
934
935 assert!(matches!(handle.await, Ok(100_000_000)));
937 });
938 }
939
940 fn test_metrics<R: Runner>(runner: R)
941 where
942 R::Context: Metrics,
943 {
944 runner.start(|context| async move {
945 assert_eq!(context.label(), "");
947
948 let counter = Counter::<u64>::default();
950 context.register("test", "test", counter.clone());
951
952 counter.inc();
954
955 let buffer = context.encode();
957 assert!(buffer.contains("test_total 1"));
958
959 let context = context.with_label("nested");
961 let nested_counter = Counter::<u64>::default();
962 context.register("test", "test", nested_counter.clone());
963
964 nested_counter.inc();
966
967 let buffer = context.encode();
969 assert!(buffer.contains("nested_test_total 1"));
970 assert!(buffer.contains("test_total 1"));
971 });
972 }
973
974 fn test_metrics_label<R: Runner>(runner: R)
975 where
976 R::Context: Metrics,
977 {
978 runner.start(|context| async move {
979 context.with_label(METRICS_PREFIX);
980 })
981 }
982
983 fn test_metrics_serve<R, L, Si, St>(runner: R)
984 where
985 R: Runner,
986 R::Context: Spawner + Metrics + Network<L, Si, St> + Clock,
987 L: Listener<Si, St>,
988 Si: Sink,
989 St: Stream,
990 {
991 runner.start(|context| async move {
992 let counter: Counter<u64> = Counter::default();
994 context.register("test_counter", "Test counter", counter.clone());
995 counter.inc();
996
997 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
999
1000 context
1002 .with_label("server")
1003 .spawn(move |context| async move {
1004 metrics::server::serve(context, address).await;
1005 });
1006
1007 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
1009 let mut line = Vec::new();
1010 loop {
1011 let mut byte = [0; 1];
1012 stream.recv(&mut byte).await?;
1013 if byte[0] == b'\n' {
1014 if line.last() == Some(&b'\r') {
1015 line.pop(); }
1017 break;
1018 }
1019 line.push(byte[0]);
1020 }
1021 String::from_utf8(line).map_err(|_| Error::ReadFailed)
1022 }
1023
1024 async fn read_headers<St: Stream>(
1025 stream: &mut St,
1026 ) -> Result<HashMap<String, String>, Error> {
1027 let mut headers = HashMap::new();
1028 loop {
1029 let line = read_line(stream).await?;
1030 if line.is_empty() {
1031 break;
1032 }
1033 let parts: Vec<&str> = line.splitn(2, ": ").collect();
1034 if parts.len() == 2 {
1035 headers.insert(parts[0].to_string(), parts[1].to_string());
1036 }
1037 }
1038 Ok(headers)
1039 }
1040
1041 async fn read_body<St: Stream>(
1042 stream: &mut St,
1043 content_length: usize,
1044 ) -> Result<String, Error> {
1045 let mut body = vec![0; content_length];
1046 stream.recv(&mut body).await?;
1047 String::from_utf8(body).map_err(|_| Error::ReadFailed)
1048 }
1049
1050 let client_handle = context
1052 .with_label("client")
1053 .spawn(move |context| async move {
1054 let (_, mut stream) = loop {
1055 match context.dial(address).await {
1056 Ok((sink, stream)) => break (sink, stream),
1057 Err(e) => {
1058 error!(err =?e, "failed to connect");
1060 context.sleep(Duration::from_millis(10)).await;
1061 }
1062 }
1063 };
1064
1065 let status_line = read_line(&mut stream).await.unwrap();
1067 assert_eq!(status_line, "HTTP/1.1 200 OK");
1068
1069 let headers = read_headers(&mut stream).await.unwrap();
1071 let content_length = headers
1072 .get("Content-Length")
1073 .unwrap()
1074 .parse::<usize>()
1075 .unwrap();
1076
1077 let body = read_body(&mut stream, content_length).await.unwrap();
1079 assert!(body.contains("test_counter_total 1"));
1080 });
1081
1082 client_handle.await.unwrap();
1084 });
1085 }
1086
1087 #[test]
1088 fn test_deterministic_future() {
1089 let runner = deterministic::Runner::default();
1090 test_error_future(runner);
1091 }
1092
1093 #[test]
1094 fn test_deterministic_clock_sleep() {
1095 let executor = deterministic::Runner::default();
1096 test_clock_sleep(executor);
1097 }
1098
1099 #[test]
1100 fn test_deterministic_clock_sleep_until() {
1101 let executor = deterministic::Runner::default();
1102 test_clock_sleep_until(executor);
1103 }
1104
1105 #[test]
1106 fn test_deterministic_root_finishes() {
1107 let executor = deterministic::Runner::default();
1108 test_root_finishes(executor);
1109 }
1110
1111 #[test]
1112 fn test_deterministic_spawn_abort() {
1113 let executor = deterministic::Runner::default();
1114 test_spawn_abort(executor);
1115 }
1116
1117 #[test]
1118 fn test_deterministic_panic_aborts_root() {
1119 let runner = deterministic::Runner::default();
1120 test_panic_aborts_root(runner);
1121 }
1122
1123 #[test]
1124 #[should_panic(expected = "blah")]
1125 fn test_deterministic_panic_aborts_spawn() {
1126 let executor = deterministic::Runner::default();
1127 test_panic_aborts_spawn(executor);
1128 }
1129
1130 #[test]
1131 fn test_deterministic_select() {
1132 let executor = deterministic::Runner::default();
1133 test_select(executor);
1134 }
1135
1136 #[test]
1137 fn test_deterministic_select_loop() {
1138 let executor = deterministic::Runner::default();
1139 test_select_loop(executor);
1140 }
1141
1142 #[test]
1143 fn test_deterministic_storage_operations() {
1144 let executor = deterministic::Runner::default();
1145 test_storage_operations(executor);
1146 }
1147
1148 #[test]
1149 fn test_deterministic_blob_read_write() {
1150 let executor = deterministic::Runner::default();
1151 test_blob_read_write(executor);
1152 }
1153
1154 #[test]
1155 fn test_deterministic_many_partition_read_write() {
1156 let executor = deterministic::Runner::default();
1157 test_many_partition_read_write(executor);
1158 }
1159
1160 #[test]
1161 fn test_deterministic_blob_read_past_length() {
1162 let executor = deterministic::Runner::default();
1163 test_blob_read_past_length(executor);
1164 }
1165
1166 #[test]
1167 fn test_deterministic_blob_clone_and_concurrent_read() {
1168 let executor = deterministic::Runner::default();
1170 test_blob_clone_and_concurrent_read(executor);
1171 }
1172
1173 #[test]
1174 fn test_deterministic_shutdown() {
1175 let executor = deterministic::Runner::default();
1176 test_shutdown(executor);
1177 }
1178
1179 #[test]
1180 fn test_deterministic_spawn_ref() {
1181 let executor = deterministic::Runner::default();
1182 test_spawn_ref(executor);
1183 }
1184
1185 #[test]
1186 #[should_panic]
1187 fn test_deterministic_spawn_ref_duplicate() {
1188 let executor = deterministic::Runner::default();
1189 test_spawn_ref_duplicate(executor);
1190 }
1191
1192 #[test]
1193 #[should_panic]
1194 fn test_deterministic_spawn_duplicate() {
1195 let executor = deterministic::Runner::default();
1196 test_spawn_duplicate(executor);
1197 }
1198
1199 #[test]
1200 fn test_deterministic_spawn_blocking() {
1201 let executor = deterministic::Runner::default();
1202 test_spawn_blocking(executor);
1203 }
1204
1205 #[test]
1206 #[should_panic(expected = "blocking task panicked")]
1207 fn test_deterministic_spawn_blocking_panic() {
1208 let executor = deterministic::Runner::default();
1209 executor.start(|context| async move {
1210 let handle = context.spawn_blocking(|| {
1211 panic!("blocking task panicked");
1212 });
1213 handle.await.unwrap();
1214 });
1215 }
1216
1217 #[test]
1218 fn test_deterministic_spawn_blocking_abort() {
1219 let executor = deterministic::Runner::default();
1220 test_spawn_blocking_abort(executor);
1221 }
1222
1223 #[test]
1224 fn test_deterministic_metrics() {
1225 let executor = deterministic::Runner::default();
1226 test_metrics(executor);
1227 }
1228
1229 #[test]
1230 #[should_panic]
1231 fn test_deterministic_metrics_label() {
1232 let executor = deterministic::Runner::default();
1233 test_metrics_label(executor);
1234 }
1235
1236 #[test]
1237 fn test_deterministic_metrics_serve() {
1238 let executor = deterministic::Runner::default();
1239 test_metrics_serve(executor);
1240 }
1241
1242 #[test]
1243 fn test_tokio_error_future() {
1244 let runner = tokio::Runner::default();
1245 test_error_future(runner);
1246 }
1247
1248 #[test]
1249 fn test_tokio_clock_sleep() {
1250 let executor = tokio::Runner::default();
1251 test_clock_sleep(executor);
1252 }
1253
1254 #[test]
1255 fn test_tokio_clock_sleep_until() {
1256 let executor = tokio::Runner::default();
1257 test_clock_sleep_until(executor);
1258 }
1259
1260 #[test]
1261 fn test_tokio_root_finishes() {
1262 let executor = tokio::Runner::default();
1263 test_root_finishes(executor);
1264 }
1265
1266 #[test]
1267 fn test_tokio_spawn_abort() {
1268 let executor = tokio::Runner::default();
1269 test_spawn_abort(executor);
1270 }
1271
1272 #[test]
1273 fn test_tokio_panic_aborts_root() {
1274 let executor = tokio::Runner::default();
1275 test_panic_aborts_root(executor);
1276 }
1277
1278 #[test]
1279 fn test_tokio_panic_aborts_spawn() {
1280 let executor = tokio::Runner::default();
1281 test_panic_aborts_spawn(executor);
1282 }
1283
1284 #[test]
1285 fn test_tokio_select() {
1286 let executor = tokio::Runner::default();
1287 test_select(executor);
1288 }
1289
1290 #[test]
1291 fn test_tokio_select_loop() {
1292 let executor = tokio::Runner::default();
1293 test_select_loop(executor);
1294 }
1295
1296 #[test]
1297 fn test_tokio_storage_operations() {
1298 let executor = tokio::Runner::default();
1299 test_storage_operations(executor);
1300 }
1301
1302 #[test]
1303 fn test_tokio_blob_read_write() {
1304 let executor = tokio::Runner::default();
1305 test_blob_read_write(executor);
1306 }
1307
1308 #[test]
1309 fn test_tokio_many_partition_read_write() {
1310 let executor = tokio::Runner::default();
1311 test_many_partition_read_write(executor);
1312 }
1313
1314 #[test]
1315 fn test_tokio_blob_read_past_length() {
1316 let executor = tokio::Runner::default();
1317 test_blob_read_past_length(executor);
1318 }
1319
1320 #[test]
1321 fn test_tokio_blob_clone_and_concurrent_read() {
1322 let executor = tokio::Runner::default();
1324 test_blob_clone_and_concurrent_read(executor);
1325 }
1326
1327 #[test]
1328 fn test_tokio_shutdown() {
1329 let executor = tokio::Runner::default();
1330 test_shutdown(executor);
1331 }
1332
1333 #[test]
1334 fn test_tokio_spawn_ref() {
1335 let executor = tokio::Runner::default();
1336 test_spawn_ref(executor);
1337 }
1338
1339 #[test]
1340 #[should_panic]
1341 fn test_tokio_spawn_ref_duplicate() {
1342 let executor = tokio::Runner::default();
1343 test_spawn_ref_duplicate(executor);
1344 }
1345
1346 #[test]
1347 #[should_panic]
1348 fn test_tokio_spawn_duplicate() {
1349 let executor = tokio::Runner::default();
1350 test_spawn_duplicate(executor);
1351 }
1352
1353 #[test]
1354 fn test_tokio_spawn_blocking() {
1355 let executor = tokio::Runner::default();
1356 test_spawn_blocking(executor);
1357 }
1358
1359 #[test]
1360 fn test_tokio_spawn_blocking_panic() {
1361 let executor = tokio::Runner::default();
1362 executor.start(|context| async move {
1363 let handle = context.spawn_blocking(|| {
1364 panic!("blocking task panicked");
1365 });
1366 let result = handle.await;
1367 assert!(matches!(result, Err(Error::Exited)));
1368 });
1369 }
1370
1371 #[test]
1372 fn test_tokio_spawn_blocking_abort() {
1373 let executor = tokio::Runner::default();
1374 test_spawn_blocking_abort(executor);
1375 }
1376
1377 #[test]
1378 fn test_tokio_metrics() {
1379 let executor = tokio::Runner::default();
1380 test_metrics(executor);
1381 }
1382
1383 #[test]
1384 #[should_panic]
1385 fn test_tokio_metrics_label() {
1386 let executor = tokio::Runner::default();
1387 test_metrics_label(executor);
1388 }
1389
1390 #[test]
1391 fn test_tokio_metrics_serve() {
1392 let executor = tokio::Runner::default();
1393 test_metrics_serve(executor);
1394 }
1395}