1use prometheus_client::registry::Metric;
21use std::{
22 future::Future,
23 net::SocketAddr,
24 time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31 if #[cfg(not(target_arch = "wasm32"))] {
32 pub mod tokio;
33 }
34}
35pub mod telemetry;
36mod utils;
37pub use utils::{reschedule, Handle, Signal, Signaler};
38
39const METRICS_PREFIX: &str = "runtime";
41
42#[derive(Error, Debug, PartialEq)]
44pub enum Error {
45 #[error("exited")]
46 Exited,
47 #[error("closed")]
48 Closed,
49 #[error("timeout")]
50 Timeout,
51 #[error("bind failed")]
52 BindFailed,
53 #[error("connection failed")]
54 ConnectionFailed,
55 #[error("write failed")]
56 WriteFailed,
57 #[error("read failed")]
58 ReadFailed,
59 #[error("send failed")]
60 SendFailed,
61 #[error("recv failed")]
62 RecvFailed,
63 #[error("partition creation failed: {0}")]
64 PartitionCreationFailed(String),
65 #[error("partition missing: {0}")]
66 PartitionMissing(String),
67 #[error("partition corrupt: {0}")]
68 PartitionCorrupt(String),
69 #[error("blob open failed: {0}/{1}")]
70 BlobOpenFailed(String, String),
71 #[error("blob missing: {0}/{1}")]
72 BlobMissing(String, String),
73 #[error("blob truncate failed: {0}/{1}")]
74 BlobTruncateFailed(String, String),
75 #[error("blob sync failed: {0}/{1}")]
76 BlobSyncFailed(String, String),
77 #[error("blob close failed: {0}/{1}")]
78 BlobCloseFailed(String, String),
79 #[error("blob insufficient length")]
80 BlobInsufficientLength,
81 #[error("offset overflow")]
82 OffsetOverflow,
83}
84
85pub trait Runner {
88 fn start<F>(self, f: F) -> F::Output
93 where
94 F: Future + Send + 'static,
95 F::Output: Send + 'static;
96}
97
98pub trait Spawner: Clone + Send + Sync + 'static {
100 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
108 where
109 F: FnOnce(Self) -> Fut + Send + 'static,
110 Fut: Future<Output = T> + Send + 'static,
111 T: Send + 'static;
112
113 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
126 where
127 F: Future<Output = T> + Send + 'static,
128 T: Send + 'static;
129
130 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
140 where
141 F: FnOnce() -> T + Send + 'static,
142 T: Send + 'static;
143
144 fn stop(&self, value: i32);
151
152 fn stopped(&self) -> Signal;
157}
158
159pub trait Metrics: Clone + Send + Sync + 'static {
161 fn label(&self) -> String;
163
164 fn with_label(&self, label: &str) -> Self;
172
173 fn scoped_label(&self, label: &str) -> String {
177 let label = if self.label().is_empty() {
178 label.to_string()
179 } else {
180 format!("{}_{}", self.label(), label)
181 };
182 assert!(
183 !label.starts_with(METRICS_PREFIX),
184 "using runtime label is not allowed"
185 );
186 label
187 }
188
189 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
193
194 fn encode(&self) -> String;
196}
197
198pub trait Clock: Clone + Send + Sync + 'static {
204 fn current(&self) -> SystemTime;
206
207 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
209
210 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
212}
213
214pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
217where
218 L: Listener<Si, St>,
219 Si: Sink,
220 St: Stream,
221{
222 fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
224
225 fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
227}
228
229pub trait Listener<Si, St>: Sync + Send + 'static
232where
233 Si: Sink,
234 St: Stream,
235{
236 fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
238}
239
240pub trait Sink: Sync + Send + 'static {
243 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
245}
246
247pub trait Stream: Sync + Send + 'static {
250 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
253}
254
255pub trait Storage<B>: Clone + Send + Sync + 'static
263where
264 B: Blob,
265{
266 fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
271
272 fn remove(
276 &self,
277 partition: &str,
278 name: Option<&[u8]>,
279 ) -> impl Future<Output = Result<(), Error>> + Send;
280
281 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
283}
284
285#[allow(clippy::len_without_is_empty)]
296pub trait Blob: Clone + Send + Sync + 'static {
297 fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
299
300 fn read_at(
305 &self,
306 buf: &mut [u8],
307 offset: u64,
308 ) -> impl Future<Output = Result<(), Error>> + Send;
309
310 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
312
313 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
315
316 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
318
319 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use commonware_macros::select;
327 use futures::channel::oneshot;
328 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
329 use prometheus_client::metrics::counter::Counter;
330 use std::collections::HashMap;
331 use std::panic::{catch_unwind, AssertUnwindSafe};
332 use std::str::FromStr;
333 use std::sync::Mutex;
334 use telemetry::metrics;
335 use tracing::error;
336 use utils::reschedule;
337
338 fn test_error_future(runner: impl Runner) {
339 async fn error_future() -> Result<&'static str, &'static str> {
340 Err("An error occurred")
341 }
342 let result = runner.start(error_future());
343 assert_eq!(result, Err("An error occurred"));
344 }
345
346 fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
347 runner.start(async move {
348 let start = context.current();
350 let sleep_duration = Duration::from_millis(10);
351 context.sleep(sleep_duration).await;
352
353 let end = context.current();
355 assert!(end.duration_since(start).unwrap() >= sleep_duration);
356 });
357 }
358
359 fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
360 runner.start(async move {
361 let now = context.current();
363 context.sleep_until(now + Duration::from_millis(100)).await;
364
365 let elapsed = now.elapsed().unwrap();
367 assert!(elapsed >= Duration::from_millis(100));
368 });
369 }
370
371 fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
372 runner.start(async move {
373 context.spawn(|_| async move {
374 loop {
375 reschedule().await;
376 }
377 });
378 });
379 }
380
381 fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
382 runner.start(async move {
383 let handle = context.spawn(|_| async move {
384 loop {
385 reschedule().await;
386 }
387 });
388 handle.abort();
389 assert_eq!(handle.await, Err(Error::Closed));
390 });
391 }
392
393 fn test_panic_aborts_root(runner: impl Runner) {
394 let result = catch_unwind(AssertUnwindSafe(|| {
395 runner.start(async move {
396 panic!("blah");
397 });
398 }));
399 result.unwrap_err();
400 }
401
402 fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
403 let result = runner.start(async move {
404 let result = context.spawn(|_| async move {
405 panic!("blah");
406 });
407 assert_eq!(result.await, Err(Error::Exited));
408 Result::<(), Error>::Ok(())
409 });
410
411 result.unwrap();
413 }
414
415 fn test_select(runner: impl Runner) {
416 runner.start(async move {
417 let output = Mutex::new(0);
419 select! {
420 v1 = ready(1) => {
421 *output.lock().unwrap() = v1;
422 },
423 v2 = ready(2) => {
424 *output.lock().unwrap() = v2;
425 },
426 };
427 assert_eq!(*output.lock().unwrap(), 1);
428
429 select! {
431 v1 = std::future::pending::<i32>() => {
432 *output.lock().unwrap() = v1;
433 },
434 v2 = ready(2) => {
435 *output.lock().unwrap() = v2;
436 },
437 };
438 assert_eq!(*output.lock().unwrap(), 2);
439 });
440 }
441
442 fn test_select_loop(runner: impl Runner, context: impl Clock) {
444 runner.start(async move {
445 let (mut sender, mut receiver) = mpsc::unbounded();
447 for _ in 0..2 {
448 select! {
449 v = receiver.next() => {
450 panic!("unexpected value: {:?}", v);
451 },
452 _ = context.sleep(Duration::from_millis(100)) => {
453 continue;
454 },
455 };
456 }
457
458 sender.send(0).await.unwrap();
460 sender.send(1).await.unwrap();
461
462 select! {
464 _ = async {} => {
465 },
467 v = receiver.next() => {
468 panic!("unexpected value: {:?}", v);
469 },
470 };
471
472 for i in 0..2 {
474 select! {
475 _ = context.sleep(Duration::from_millis(100)) => {
476 panic!("timeout");
477 },
478 v = receiver.next() => {
479 assert_eq!(v.unwrap(), i);
480 },
481 };
482 }
483 });
484 }
485
486 fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
487 where
488 B: Blob,
489 {
490 runner.start(async move {
491 let partition = "test_partition";
492 let name = b"test_blob";
493
494 let blob = context
496 .open(partition, name)
497 .await
498 .expect("Failed to open blob");
499
500 let data = b"Hello, Storage!";
502 blob.write_at(data, 0)
503 .await
504 .expect("Failed to write to blob");
505
506 blob.sync().await.expect("Failed to sync blob");
508
509 let mut buffer = vec![0u8; data.len()];
511 blob.read_at(&mut buffer, 0)
512 .await
513 .expect("Failed to read from blob");
514 assert_eq!(&buffer, data);
515
516 let length = blob.len().await.expect("Failed to get blob length");
518 assert_eq!(length, data.len() as u64);
519
520 blob.close().await.expect("Failed to close blob");
522
523 let blobs = context
525 .scan(partition)
526 .await
527 .expect("Failed to scan partition");
528 assert!(blobs.contains(&name.to_vec()));
529
530 let blob = context
532 .open(partition, name)
533 .await
534 .expect("Failed to reopen blob");
535
536 let mut buffer = vec![0u8; 7];
538 blob.read_at(&mut buffer, 7)
539 .await
540 .expect("Failed to read data");
541 assert_eq!(&buffer, b"Storage");
542
543 blob.close().await.expect("Failed to close blob");
545
546 context
548 .remove(partition, Some(name))
549 .await
550 .expect("Failed to remove blob");
551
552 let blobs = context
554 .scan(partition)
555 .await
556 .expect("Failed to scan partition");
557 assert!(!blobs.contains(&name.to_vec()));
558
559 context
561 .remove(partition, None)
562 .await
563 .expect("Failed to remove partition");
564
565 let result = context.scan(partition).await;
567 assert!(matches!(result, Err(Error::PartitionMissing(_))));
568 });
569 }
570
571 fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
572 where
573 B: Blob,
574 {
575 runner.start(async move {
576 let partition = "test_partition";
577 let name = b"test_blob_rw";
578
579 let blob = context
581 .open(partition, name)
582 .await
583 .expect("Failed to open blob");
584
585 let data1 = b"Hello";
587 let data2 = b"World";
588 blob.write_at(data1, 0)
589 .await
590 .expect("Failed to write data1");
591 blob.write_at(data2, 5)
592 .await
593 .expect("Failed to write data2");
594
595 let length = blob.len().await.expect("Failed to get blob length");
597 assert_eq!(length, 10);
598
599 let mut buffer = vec![0u8; 10];
601 blob.read_at(&mut buffer, 0)
602 .await
603 .expect("Failed to read data");
604 assert_eq!(&buffer[..5], data1);
605 assert_eq!(&buffer[5..], data2);
606
607 let data3 = b"Store";
609 blob.write_at(data3, 5)
610 .await
611 .expect("Failed to write data3");
612 let length = blob.len().await.expect("Failed to get blob length");
613 assert_eq!(length, 10);
614
615 blob.truncate(5).await.expect("Failed to truncate blob");
617 let length = blob.len().await.expect("Failed to get blob length");
618 assert_eq!(length, 5);
619 let mut buffer = vec![0u8; 5];
620 blob.read_at(&mut buffer, 0)
621 .await
622 .expect("Failed to read data");
623 assert_eq!(&buffer[..5], data1);
624
625 let mut buffer = vec![0u8; 10];
627 let result = blob.read_at(&mut buffer, 0).await;
628 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
629
630 blob.close().await.expect("Failed to close blob");
632 });
633 }
634
635 fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
636 where
637 B: Blob,
638 {
639 runner.start(async move {
640 let partitions = ["partition1", "partition2", "partition3"];
641 let name = b"test_blob_rw";
642
643 for (additional, partition) in partitions.iter().enumerate() {
644 let blob = context
646 .open(partition, name)
647 .await
648 .expect("Failed to open blob");
649
650 let data1 = b"Hello";
652 let data2 = b"World";
653 blob.write_at(data1, 0)
654 .await
655 .expect("Failed to write data1");
656 blob.write_at(data2, 5 + additional as u64)
657 .await
658 .expect("Failed to write data2");
659
660 blob.close().await.expect("Failed to close blob");
662 }
663
664 for (additional, partition) in partitions.iter().enumerate() {
665 let blob = context
667 .open(partition, name)
668 .await
669 .expect("Failed to open blob");
670
671 let mut buffer = vec![0u8; 10 + additional];
673 blob.read_at(&mut buffer, 0)
674 .await
675 .expect("Failed to read data");
676 assert_eq!(&buffer[..5], b"Hello");
677 assert_eq!(&buffer[5 + additional..], b"World");
678
679 blob.close().await.expect("Failed to close blob");
681 }
682 });
683 }
684
685 fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
686 where
687 B: Blob,
688 {
689 runner.start(async move {
690 let partition = "test_partition";
691 let name = b"test_blob_rw";
692
693 let blob = context
695 .open(partition, name)
696 .await
697 .expect("Failed to open blob");
698
699 let mut buffer = vec![0u8; 10];
701 let result = blob.read_at(&mut buffer, 0).await;
702 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
703
704 let data = b"Hello, Storage!";
706 blob.write_at(data, 0)
707 .await
708 .expect("Failed to write to blob");
709
710 let mut buffer = vec![0u8; 20];
712 let result = blob.read_at(&mut buffer, 0).await;
713 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
714 })
715 }
716
717 fn test_blob_clone_and_concurrent_read<B>(
718 runner: impl Runner,
719 context: impl Spawner + Storage<B> + Metrics,
720 ) where
721 B: Blob,
722 {
723 runner.start(async move {
724 let partition = "test_partition";
725 let name = b"test_blob_rw";
726
727 let blob = context
729 .open(partition, name)
730 .await
731 .expect("Failed to open blob");
732
733 let data = b"Hello, Storage!";
735 blob.write_at(data, 0)
736 .await
737 .expect("Failed to write to blob");
738
739 blob.sync().await.expect("Failed to sync blob");
741
742 let check1 = context.with_label("check1").spawn({
744 let blob = blob.clone();
745 move |_| async move {
746 let mut buffer = vec![0u8; data.len()];
747 blob.read_at(&mut buffer, 0)
748 .await
749 .expect("Failed to read from blob");
750 assert_eq!(&buffer, data);
751 }
752 });
753 let check2 = context.with_label("check2").spawn({
754 let blob = blob.clone();
755 move |_| async move {
756 let mut buffer = vec![0u8; data.len()];
757 blob.read_at(&mut buffer, 0)
758 .await
759 .expect("Failed to read from blob");
760 assert_eq!(&buffer, data);
761 }
762 });
763
764 let result = join!(check1, check2);
766 assert!(result.0.is_ok());
767 assert!(result.1.is_ok());
768
769 let mut buffer = vec![0u8; data.len()];
771 blob.read_at(&mut buffer, 0)
772 .await
773 .expect("Failed to read from blob");
774 assert_eq!(&buffer, data);
775
776 let length = blob.len().await.expect("Failed to get blob length");
778 assert_eq!(length, data.len() as u64);
779
780 blob.close().await.expect("Failed to close blob");
782 });
783 }
784
785 fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
786 let kill = 9;
787 runner.start(async move {
788 let before = context
790 .with_label("before")
791 .spawn(move |context| async move {
792 let sig = context.stopped().await;
793 assert_eq!(sig.unwrap(), kill);
794 });
795
796 let after = context
798 .with_label("after")
799 .spawn(move |context| async move {
800 let mut signal = context.stopped();
802 loop {
803 select! {
804 sig = &mut signal => {
805 assert_eq!(sig.unwrap(), kill);
807 break;
808 },
809 _ = context.sleep(Duration::from_millis(10)) => {
810 },
812 }
813 }
814 });
815
816 context.sleep(Duration::from_millis(50)).await;
818
819 context.stop(kill);
821
822 let result = join!(before, after);
824 assert!(result.0.is_ok());
825 assert!(result.1.is_ok());
826 });
827 }
828
829 fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
830 runner.start(async move {
831 let handle = context.spawn_ref();
832 let result = handle(async move { 42 }).await;
833 assert_eq!(result, Ok(42));
834 });
835 }
836
837 fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
838 runner.start(async move {
839 let handle = context.spawn_ref();
840 let result = handle(async move { 42 }).await;
841 assert_eq!(result, Ok(42));
842
843 let handle = context.spawn_ref();
845 let result = handle(async move { 42 }).await;
846 assert_eq!(result, Ok(42));
847 });
848 }
849
850 fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
851 runner.start(async move {
852 let handle = context.spawn_ref();
853 let result = handle(async move { 42 }).await;
854 assert_eq!(result, Ok(42));
855
856 context.spawn(|_| async move { 42 });
858 });
859 }
860
861 fn test_spawn_blocking(runner: impl Runner, context: impl Spawner) {
862 runner.start(async move {
863 let handle = context.spawn_blocking(|| 42);
864 let result = handle.await;
865 assert_eq!(result, Ok(42));
866 });
867 }
868
869 fn test_spawn_blocking_abort(runner: impl Runner, context: impl Spawner) {
870 runner.start(async move {
871 let (sender, mut receiver) = oneshot::channel();
873 let handle = context.spawn_blocking(move || {
874 loop {
876 if receiver.try_recv().is_ok() {
877 break;
878 }
879 }
880
881 let mut count = 0;
883 loop {
884 count += 1;
885 if count >= 100_000_000 {
886 break;
887 }
888 }
889 count
890 });
891
892 handle.abort();
898 sender.send(()).unwrap();
899
900 assert_eq!(handle.await, Ok(100_000_000));
902 });
903 }
904
905 fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
906 runner.start(async move {
907 assert_eq!(context.label(), "");
909
910 let counter = Counter::<u64>::default();
912 context.register("test", "test", counter.clone());
913
914 counter.inc();
916
917 let buffer = context.encode();
919 assert!(buffer.contains("test_total 1"));
920
921 let context = context.with_label("nested");
923 let nested_counter = Counter::<u64>::default();
924 context.register("test", "test", nested_counter.clone());
925
926 nested_counter.inc();
928
929 let buffer = context.encode();
931 assert!(buffer.contains("nested_test_total 1"));
932 assert!(buffer.contains("test_total 1"));
933 });
934 }
935
936 fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
937 runner.start(async move {
938 context.with_label(METRICS_PREFIX);
939 })
940 }
941
942 fn test_metrics_serve<L, Si, St>(
943 runner: impl Runner,
944 context: impl Clock + Spawner + Metrics + Network<L, Si, St>,
945 ) where
946 L: Listener<Si, St>,
947 Si: Sink,
948 St: Stream,
949 {
950 runner.start(async move {
951 let counter: Counter<u64> = Counter::default();
953 context.register("test_counter", "Test counter", counter.clone());
954 counter.inc();
955
956 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
958
959 context
961 .with_label("server")
962 .spawn(move |context| async move {
963 metrics::server::serve(context, address).await;
964 });
965
966 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
968 let mut line = Vec::new();
969 loop {
970 let mut byte = [0; 1];
971 stream.recv(&mut byte).await?;
972 if byte[0] == b'\n' {
973 if line.last() == Some(&b'\r') {
974 line.pop(); }
976 break;
977 }
978 line.push(byte[0]);
979 }
980 String::from_utf8(line).map_err(|_| Error::ReadFailed)
981 }
982
983 async fn read_headers<St: Stream>(
984 stream: &mut St,
985 ) -> Result<HashMap<String, String>, Error> {
986 let mut headers = HashMap::new();
987 loop {
988 let line = read_line(stream).await?;
989 if line.is_empty() {
990 break;
991 }
992 let parts: Vec<&str> = line.splitn(2, ": ").collect();
993 if parts.len() == 2 {
994 headers.insert(parts[0].to_string(), parts[1].to_string());
995 }
996 }
997 Ok(headers)
998 }
999
1000 async fn read_body<St: Stream>(
1001 stream: &mut St,
1002 content_length: usize,
1003 ) -> Result<String, Error> {
1004 let mut body = vec![0; content_length];
1005 stream.recv(&mut body).await?;
1006 String::from_utf8(body).map_err(|_| Error::ReadFailed)
1007 }
1008
1009 let client_handle = context
1011 .with_label("client")
1012 .spawn(move |context| async move {
1013 let (_, mut stream) = loop {
1014 match context.dial(address).await {
1015 Ok((sink, stream)) => break (sink, stream),
1016 Err(e) => {
1017 error!(err =?e, "failed to connect");
1019 context.sleep(Duration::from_millis(10)).await;
1020 }
1021 }
1022 };
1023
1024 let status_line = read_line(&mut stream).await.unwrap();
1026 assert_eq!(status_line, "HTTP/1.1 200 OK");
1027
1028 let headers = read_headers(&mut stream).await.unwrap();
1030 let content_length = headers
1031 .get("Content-Length")
1032 .unwrap()
1033 .parse::<usize>()
1034 .unwrap();
1035
1036 let body = read_body(&mut stream, content_length).await.unwrap();
1038 assert!(body.contains("test_counter_total 1"));
1039 });
1040
1041 client_handle.await.unwrap();
1043 });
1044 }
1045
1046 #[test]
1047 fn test_deterministic_future() {
1048 let (runner, _, _) = deterministic::Executor::default();
1049 test_error_future(runner);
1050 }
1051
1052 #[test]
1053 fn test_deterministic_clock_sleep() {
1054 let (executor, context, _) = deterministic::Executor::default();
1055 assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
1056 test_clock_sleep(executor, context);
1057 }
1058
1059 #[test]
1060 fn test_deterministic_clock_sleep_until() {
1061 let (executor, context, _) = deterministic::Executor::default();
1062 test_clock_sleep_until(executor, context);
1063 }
1064
1065 #[test]
1066 fn test_deterministic_root_finishes() {
1067 let (executor, context, _) = deterministic::Executor::default();
1068 test_root_finishes(executor, context);
1069 }
1070
1071 #[test]
1072 fn test_deterministic_spawn_abort() {
1073 let (executor, context, _) = deterministic::Executor::default();
1074 test_spawn_abort(executor, context);
1075 }
1076
1077 #[test]
1078 fn test_deterministic_panic_aborts_root() {
1079 let (runner, _, _) = deterministic::Executor::default();
1080 test_panic_aborts_root(runner);
1081 }
1082
1083 #[test]
1084 #[should_panic(expected = "blah")]
1085 fn test_deterministic_panic_aborts_spawn() {
1086 let (executor, context, _) = deterministic::Executor::default();
1087 test_panic_aborts_spawn(executor, context);
1088 }
1089
1090 #[test]
1091 fn test_deterministic_select() {
1092 let (executor, _, _) = deterministic::Executor::default();
1093 test_select(executor);
1094 }
1095
1096 #[test]
1097 fn test_deterministic_select_loop() {
1098 let (executor, context, _) = deterministic::Executor::default();
1099 test_select_loop(executor, context);
1100 }
1101
1102 #[test]
1103 fn test_deterministic_storage_operations() {
1104 let (executor, context, _) = deterministic::Executor::default();
1105 test_storage_operations(executor, context);
1106 }
1107
1108 #[test]
1109 fn test_deterministic_blob_read_write() {
1110 let (executor, context, _) = deterministic::Executor::default();
1111 test_blob_read_write(executor, context);
1112 }
1113
1114 #[test]
1115 fn test_deterministic_many_partition_read_write() {
1116 let (executor, context, _) = deterministic::Executor::default();
1117 test_many_partition_read_write(executor, context);
1118 }
1119
1120 #[test]
1121 fn test_deterministic_blob_read_past_length() {
1122 let (executor, context, _) = deterministic::Executor::default();
1123 test_blob_read_past_length(executor, context);
1124 }
1125
1126 #[test]
1127 fn test_deterministic_blob_clone_and_concurrent_read() {
1128 let (executor, context, _) = deterministic::Executor::default();
1130 test_blob_clone_and_concurrent_read(executor, context.clone());
1131
1132 let buffer = context.encode();
1134 assert!(buffer.contains("open_blobs 0"));
1135 }
1136
1137 #[test]
1138 fn test_deterministic_shutdown() {
1139 let (executor, context, _) = deterministic::Executor::default();
1140 test_shutdown(executor, context);
1141 }
1142
1143 #[test]
1144 fn test_deterministic_spawn_ref() {
1145 let (executor, context, _) = deterministic::Executor::default();
1146 test_spawn_ref(executor, context);
1147 }
1148
1149 #[test]
1150 #[should_panic]
1151 fn test_deterministic_spawn_ref_duplicate() {
1152 let (executor, context, _) = deterministic::Executor::default();
1153 test_spawn_ref_duplicate(executor, context);
1154 }
1155
1156 #[test]
1157 #[should_panic]
1158 fn test_deterministic_spawn_duplicate() {
1159 let (executor, context, _) = deterministic::Executor::default();
1160 test_spawn_duplicate(executor, context);
1161 }
1162
1163 #[test]
1164 fn test_deterministic_spawn_blocking() {
1165 let (executor, context, _) = deterministic::Executor::default();
1166 test_spawn_blocking(executor, context);
1167 }
1168
1169 #[test]
1170 #[should_panic(expected = "blocking task panicked")]
1171 fn test_deterministic_spawn_blocking_panic() {
1172 let (executor, context, _) = deterministic::Executor::default();
1173 executor.start(async move {
1174 let handle = context.spawn_blocking(|| {
1175 panic!("blocking task panicked");
1176 });
1177 handle.await.unwrap();
1178 });
1179 }
1180
1181 #[test]
1182 fn test_deterministic_spawn_blocking_abort() {
1183 let (executor, context, _) = deterministic::Executor::default();
1184 test_spawn_blocking_abort(executor, context);
1185 }
1186
1187 #[test]
1188 fn test_deterministic_metrics() {
1189 let (executor, context, _) = deterministic::Executor::default();
1190 test_metrics(executor, context);
1191 }
1192
1193 #[test]
1194 #[should_panic]
1195 fn test_deterministic_metrics_label() {
1196 let (executor, context, _) = deterministic::Executor::default();
1197 test_metrics_label(executor, context);
1198 }
1199
1200 #[test]
1201 fn test_deterministic_metrics_serve() {
1202 let (executor, context, _) = deterministic::Executor::default();
1203 test_metrics_serve(executor, context);
1204 }
1205
1206 #[test]
1207 fn test_tokio_error_future() {
1208 let (runner, _) = tokio::Executor::default();
1209 test_error_future(runner);
1210 }
1211
1212 #[test]
1213 fn test_tokio_clock_sleep() {
1214 let (executor, context) = tokio::Executor::default();
1215 test_clock_sleep(executor, context);
1216 }
1217
1218 #[test]
1219 fn test_tokio_clock_sleep_until() {
1220 let (executor, context) = tokio::Executor::default();
1221 test_clock_sleep_until(executor, context);
1222 }
1223
1224 #[test]
1225 fn test_tokio_root_finishes() {
1226 let (executor, context) = tokio::Executor::default();
1227 test_root_finishes(executor, context);
1228 }
1229
1230 #[test]
1231 fn test_tokio_spawn_abort() {
1232 let (executor, context) = tokio::Executor::default();
1233 test_spawn_abort(executor, context);
1234 }
1235
1236 #[test]
1237 fn test_tokio_panic_aborts_root() {
1238 let (runner, _) = tokio::Executor::default();
1239 test_panic_aborts_root(runner);
1240 }
1241
1242 #[test]
1243 fn test_tokio_panic_aborts_spawn() {
1244 let (executor, context) = tokio::Executor::default();
1245 test_panic_aborts_spawn(executor, context);
1246 }
1247
1248 #[test]
1249 fn test_tokio_select() {
1250 let (executor, _) = tokio::Executor::default();
1251 test_select(executor);
1252 }
1253
1254 #[test]
1255 fn test_tokio_select_loop() {
1256 let (executor, context) = tokio::Executor::default();
1257 test_select_loop(executor, context);
1258 }
1259
1260 #[test]
1261 fn test_tokio_storage_operations() {
1262 let (executor, context) = tokio::Executor::default();
1263 test_storage_operations(executor, context);
1264 }
1265
1266 #[test]
1267 fn test_tokio_blob_read_write() {
1268 let (executor, context) = tokio::Executor::default();
1269 test_blob_read_write(executor, context);
1270 }
1271
1272 #[test]
1273 fn test_tokio_many_partition_read_write() {
1274 let (executor, context) = tokio::Executor::default();
1275 test_many_partition_read_write(executor, context);
1276 }
1277
1278 #[test]
1279 fn test_tokio_blob_read_past_length() {
1280 let (executor, context) = tokio::Executor::default();
1281 test_blob_read_past_length(executor, context);
1282 }
1283
1284 #[test]
1285 fn test_tokio_blob_clone_and_concurrent_read() {
1286 let (executor, context) = tokio::Executor::default();
1288 test_blob_clone_and_concurrent_read(executor, context.clone());
1289
1290 let buffer = context.encode();
1292 assert!(buffer.contains("open_blobs 0"));
1293 }
1294
1295 #[test]
1296 fn test_tokio_shutdown() {
1297 let (executor, context) = tokio::Executor::default();
1298 test_shutdown(executor, context);
1299 }
1300
1301 #[test]
1302 fn test_tokio_spawn_ref() {
1303 let (executor, context) = tokio::Executor::default();
1304 test_spawn_ref(executor, context);
1305 }
1306
1307 #[test]
1308 #[should_panic]
1309 fn test_tokio_spawn_ref_duplicate() {
1310 let (executor, context) = tokio::Executor::default();
1311 test_spawn_ref_duplicate(executor, context);
1312 }
1313
1314 #[test]
1315 #[should_panic]
1316 fn test_tokio_spawn_duplicate() {
1317 let (executor, context) = tokio::Executor::default();
1318 test_spawn_duplicate(executor, context);
1319 }
1320
1321 #[test]
1322 fn test_tokio_spawn_blocking() {
1323 let (executor, context) = tokio::Executor::default();
1324 test_spawn_blocking(executor, context);
1325 }
1326
1327 #[test]
1328 fn test_tokio_spawn_blocking_panic() {
1329 let (executor, context) = tokio::Executor::default();
1330 executor.start(async move {
1331 let handle = context.spawn_blocking(|| {
1332 panic!("blocking task panicked");
1333 });
1334 let result = handle.await;
1335 assert_eq!(result, Err(Error::Exited));
1336 });
1337 }
1338
1339 #[test]
1340 fn test_tokio_spawn_blocking_abort() {
1341 let (executor, context) = tokio::Executor::default();
1342 test_spawn_blocking_abort(executor, context);
1343 }
1344
1345 #[test]
1346 fn test_tokio_metrics() {
1347 let (executor, context) = tokio::Executor::default();
1348 test_metrics(executor, context);
1349 }
1350
1351 #[test]
1352 #[should_panic]
1353 fn test_tokio_metrics_label() {
1354 let (executor, context) = tokio::Executor::default();
1355 test_metrics_label(executor, context);
1356 }
1357
1358 #[test]
1359 fn test_tokio_metrics_serve() {
1360 let (executor, context) = tokio::Executor::default();
1361 test_metrics_serve(executor, context);
1362 }
1363}