1use prometheus_client::registry::Metric;
21use std::{
22 future::Future,
23 net::SocketAddr,
24 time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31 if #[cfg(not(target_arch = "wasm32"))] {
32 pub mod tokio;
33 pub mod benchmarks;
34 }
35}
36mod storage;
37pub mod telemetry;
38mod utils;
39pub use utils::{create_pool, reschedule, Handle, Signal, Signaler};
40
41const METRICS_PREFIX: &str = "runtime";
43
44#[derive(Error, Debug, PartialEq)]
46pub enum Error {
47 #[error("exited")]
48 Exited,
49 #[error("closed")]
50 Closed,
51 #[error("timeout")]
52 Timeout,
53 #[error("bind failed")]
54 BindFailed,
55 #[error("connection failed")]
56 ConnectionFailed,
57 #[error("write failed")]
58 WriteFailed,
59 #[error("read failed")]
60 ReadFailed,
61 #[error("send failed")]
62 SendFailed,
63 #[error("recv failed")]
64 RecvFailed,
65 #[error("partition creation failed: {0}")]
66 PartitionCreationFailed(String),
67 #[error("partition missing: {0}")]
68 PartitionMissing(String),
69 #[error("partition corrupt: {0}")]
70 PartitionCorrupt(String),
71 #[error("blob open failed: {0}/{1}")]
72 BlobOpenFailed(String, String),
73 #[error("blob missing: {0}/{1}")]
74 BlobMissing(String, String),
75 #[error("blob truncate failed: {0}/{1}")]
76 BlobTruncateFailed(String, String),
77 #[error("blob sync failed: {0}/{1}")]
78 BlobSyncFailed(String, String),
79 #[error("blob close failed: {0}/{1}")]
80 BlobCloseFailed(String, String),
81 #[error("blob insufficient length")]
82 BlobInsufficientLength,
83 #[error("offset overflow")]
84 OffsetOverflow,
85}
86
87pub trait Runner {
90 fn start<F>(self, f: F) -> F::Output
95 where
96 F: Future;
97}
98
99pub trait Spawner: Clone + Send + Sync + 'static {
101 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
109 where
110 F: FnOnce(Self) -> Fut + Send + 'static,
111 Fut: Future<Output = T> + Send + 'static,
112 T: Send + 'static;
113
114 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
127 where
128 F: Future<Output = T> + Send + 'static,
129 T: Send + 'static;
130
131 fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
141 where
142 F: FnOnce() -> T + Send + 'static,
143 T: Send + 'static;
144
145 fn stop(&self, value: i32);
152
153 fn stopped(&self) -> Signal;
158}
159
160pub trait Metrics: Clone + Send + Sync + 'static {
162 fn label(&self) -> String;
164
165 fn with_label(&self, label: &str) -> Self;
173
174 fn scoped_label(&self, label: &str) -> String {
178 let label = if self.label().is_empty() {
179 label.to_string()
180 } else {
181 format!("{}_{}", self.label(), label)
182 };
183 assert!(
184 !label.starts_with(METRICS_PREFIX),
185 "using runtime label is not allowed"
186 );
187 label
188 }
189
190 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
194
195 fn encode(&self) -> String;
197}
198
199pub trait Clock: Clone + Send + Sync + 'static {
205 fn current(&self) -> SystemTime;
207
208 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
210
211 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
213}
214
215pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
218where
219 L: Listener<Si, St>,
220 Si: Sink,
221 St: Stream,
222{
223 fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
225
226 fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
228}
229
230pub trait Listener<Si, St>: Sync + Send + 'static
233where
234 Si: Sink,
235 St: Stream,
236{
237 fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
239}
240
241pub trait Sink: Sync + Send + 'static {
244 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
246}
247
248pub trait Stream: Sync + Send + 'static {
251 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
254}
255
256pub trait Storage: Clone + Send + Sync + 'static {
264 type Blob: Blob;
266
267 fn open(
272 &self,
273 partition: &str,
274 name: &[u8],
275 ) -> impl Future<Output = Result<Self::Blob, Error>> + Send;
276
277 fn remove(
281 &self,
282 partition: &str,
283 name: Option<&[u8]>,
284 ) -> impl Future<Output = Result<(), Error>> + Send;
285
286 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
288}
289
290#[allow(clippy::len_without_is_empty)]
301pub trait Blob: Clone + Send + Sync + 'static {
302 fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
304
305 fn read_at(
310 &self,
311 buf: &mut [u8],
312 offset: u64,
313 ) -> impl Future<Output = Result<(), Error>> + Send;
314
315 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
317
318 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
320
321 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
323
324 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use commonware_macros::select;
332 use futures::channel::oneshot;
333 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
334 use prometheus_client::metrics::counter::Counter;
335 use std::collections::HashMap;
336 use std::panic::{catch_unwind, AssertUnwindSafe};
337 use std::str::FromStr;
338 use std::sync::Mutex;
339 use telemetry::metrics;
340 use tracing::error;
341 use utils::reschedule;
342
343 fn test_error_future(runner: impl Runner) {
344 async fn error_future() -> Result<&'static str, &'static str> {
345 Err("An error occurred")
346 }
347 let result = runner.start(error_future());
348 assert_eq!(result, Err("An error occurred"));
349 }
350
351 fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
352 runner.start(async move {
353 let start = context.current();
355 let sleep_duration = Duration::from_millis(10);
356 context.sleep(sleep_duration).await;
357
358 let end = context.current();
360 assert!(end.duration_since(start).unwrap() >= sleep_duration);
361 });
362 }
363
364 fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
365 runner.start(async move {
366 let now = context.current();
368 context.sleep_until(now + Duration::from_millis(100)).await;
369
370 let elapsed = now.elapsed().unwrap();
372 assert!(elapsed >= Duration::from_millis(100));
373 });
374 }
375
376 fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
377 runner.start(async move {
378 context.spawn(|_| async move {
379 loop {
380 reschedule().await;
381 }
382 });
383 });
384 }
385
386 fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
387 runner.start(async move {
388 let handle = context.spawn(|_| async move {
389 loop {
390 reschedule().await;
391 }
392 });
393 handle.abort();
394 assert_eq!(handle.await, Err(Error::Closed));
395 });
396 }
397
398 fn test_panic_aborts_root(runner: impl Runner) {
399 let result = catch_unwind(AssertUnwindSafe(|| {
400 runner.start(async move {
401 panic!("blah");
402 });
403 }));
404 result.unwrap_err();
405 }
406
407 fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
408 let result = runner.start(async move {
409 let result = context.spawn(|_| async move {
410 panic!("blah");
411 });
412 assert_eq!(result.await, Err(Error::Exited));
413 Result::<(), Error>::Ok(())
414 });
415
416 result.unwrap();
418 }
419
420 fn test_select(runner: impl Runner) {
421 runner.start(async move {
422 let output = Mutex::new(0);
424 select! {
425 v1 = ready(1) => {
426 *output.lock().unwrap() = v1;
427 },
428 v2 = ready(2) => {
429 *output.lock().unwrap() = v2;
430 },
431 };
432 assert_eq!(*output.lock().unwrap(), 1);
433
434 select! {
436 v1 = std::future::pending::<i32>() => {
437 *output.lock().unwrap() = v1;
438 },
439 v2 = ready(2) => {
440 *output.lock().unwrap() = v2;
441 },
442 };
443 assert_eq!(*output.lock().unwrap(), 2);
444 });
445 }
446
447 fn test_select_loop(runner: impl Runner, context: impl Clock) {
449 runner.start(async move {
450 let (mut sender, mut receiver) = mpsc::unbounded();
452 for _ in 0..2 {
453 select! {
454 v = receiver.next() => {
455 panic!("unexpected value: {:?}", v);
456 },
457 _ = context.sleep(Duration::from_millis(100)) => {
458 continue;
459 },
460 };
461 }
462
463 sender.send(0).await.unwrap();
465 sender.send(1).await.unwrap();
466
467 select! {
469 _ = async {} => {
470 },
472 v = receiver.next() => {
473 panic!("unexpected value: {:?}", v);
474 },
475 };
476
477 for i in 0..2 {
479 select! {
480 _ = context.sleep(Duration::from_millis(100)) => {
481 panic!("timeout");
482 },
483 v = receiver.next() => {
484 assert_eq!(v.unwrap(), i);
485 },
486 };
487 }
488 });
489 }
490
491 fn test_storage_operations(runner: impl Runner, context: impl Spawner + Storage) {
492 runner.start(async move {
493 let partition = "test_partition";
494 let name = b"test_blob";
495
496 let blob = context
498 .open(partition, name)
499 .await
500 .expect("Failed to open blob");
501
502 let data = b"Hello, Storage!";
504 blob.write_at(data, 0)
505 .await
506 .expect("Failed to write to blob");
507
508 blob.sync().await.expect("Failed to sync blob");
510
511 let mut buffer = vec![0u8; data.len()];
513 blob.read_at(&mut buffer, 0)
514 .await
515 .expect("Failed to read from blob");
516 assert_eq!(&buffer, data);
517
518 let length = blob.len().await.expect("Failed to get blob length");
520 assert_eq!(length, data.len() as u64);
521
522 blob.close().await.expect("Failed to close blob");
524
525 let blobs = context
527 .scan(partition)
528 .await
529 .expect("Failed to scan partition");
530 assert!(blobs.contains(&name.to_vec()));
531
532 let blob = context
534 .open(partition, name)
535 .await
536 .expect("Failed to reopen blob");
537
538 let mut buffer = vec![0u8; 7];
540 blob.read_at(&mut buffer, 7)
541 .await
542 .expect("Failed to read data");
543 assert_eq!(&buffer, b"Storage");
544
545 blob.close().await.expect("Failed to close blob");
547
548 context
550 .remove(partition, Some(name))
551 .await
552 .expect("Failed to remove blob");
553
554 let blobs = context
556 .scan(partition)
557 .await
558 .expect("Failed to scan partition");
559 assert!(!blobs.contains(&name.to_vec()));
560
561 context
563 .remove(partition, None)
564 .await
565 .expect("Failed to remove partition");
566
567 let result = context.scan(partition).await;
569 assert!(matches!(result, Err(Error::PartitionMissing(_))));
570 });
571 }
572
573 fn test_blob_read_write(runner: impl Runner, context: impl Spawner + Storage) {
574 runner.start(async move {
575 let partition = "test_partition";
576 let name = b"test_blob_rw";
577
578 let blob = context
580 .open(partition, name)
581 .await
582 .expect("Failed to open blob");
583
584 let data1 = b"Hello";
586 let data2 = b"World";
587 blob.write_at(data1, 0)
588 .await
589 .expect("Failed to write data1");
590 blob.write_at(data2, 5)
591 .await
592 .expect("Failed to write data2");
593
594 let length = blob.len().await.expect("Failed to get blob length");
596 assert_eq!(length, 10);
597
598 let mut buffer = vec![0u8; 10];
600 blob.read_at(&mut buffer, 0)
601 .await
602 .expect("Failed to read data");
603 assert_eq!(&buffer[..5], data1);
604 assert_eq!(&buffer[5..], data2);
605
606 let data3 = b"Store";
608 blob.write_at(data3, 5)
609 .await
610 .expect("Failed to write data3");
611 let length = blob.len().await.expect("Failed to get blob length");
612 assert_eq!(length, 10);
613
614 blob.truncate(5).await.expect("Failed to truncate blob");
616 let length = blob.len().await.expect("Failed to get blob length");
617 assert_eq!(length, 5);
618 let mut buffer = vec![0u8; 5];
619 blob.read_at(&mut buffer, 0)
620 .await
621 .expect("Failed to read data");
622 assert_eq!(&buffer[..5], data1);
623
624 let mut buffer = vec![0u8; 10];
626 let result = blob.read_at(&mut buffer, 0).await;
627 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
628
629 blob.close().await.expect("Failed to close blob");
631 });
632 }
633
634 fn test_many_partition_read_write(runner: impl Runner, context: impl Spawner + Storage) {
635 runner.start(async move {
636 let partitions = ["partition1", "partition2", "partition3"];
637 let name = b"test_blob_rw";
638
639 for (additional, partition) in partitions.iter().enumerate() {
640 let blob = context
642 .open(partition, name)
643 .await
644 .expect("Failed to open blob");
645
646 let data1 = b"Hello";
648 let data2 = b"World";
649 blob.write_at(data1, 0)
650 .await
651 .expect("Failed to write data1");
652 blob.write_at(data2, 5 + additional as u64)
653 .await
654 .expect("Failed to write data2");
655
656 blob.close().await.expect("Failed to close blob");
658 }
659
660 for (additional, partition) in partitions.iter().enumerate() {
661 let blob = context
663 .open(partition, name)
664 .await
665 .expect("Failed to open blob");
666
667 let mut buffer = vec![0u8; 10 + additional];
669 blob.read_at(&mut buffer, 0)
670 .await
671 .expect("Failed to read data");
672 assert_eq!(&buffer[..5], b"Hello");
673 assert_eq!(&buffer[5 + additional..], b"World");
674
675 blob.close().await.expect("Failed to close blob");
677 }
678 });
679 }
680
681 fn test_blob_read_past_length(runner: impl Runner, context: impl Spawner + Storage) {
682 runner.start(async move {
683 let partition = "test_partition";
684 let name = b"test_blob_rw";
685
686 let blob = context
688 .open(partition, name)
689 .await
690 .expect("Failed to open blob");
691
692 let mut buffer = vec![0u8; 10];
694 let result = blob.read_at(&mut buffer, 0).await;
695 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
696
697 let data = b"Hello, Storage!";
699 blob.write_at(data, 0)
700 .await
701 .expect("Failed to write to blob");
702
703 let mut buffer = vec![0u8; 20];
705 let result = blob.read_at(&mut buffer, 0).await;
706 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
707 })
708 }
709
710 fn test_blob_clone_and_concurrent_read(
711 runner: impl Runner,
712 context: impl Spawner + Storage + Metrics,
713 ) {
714 runner.start(async move {
715 let partition = "test_partition";
716 let name = b"test_blob_rw";
717
718 let blob = context
720 .open(partition, name)
721 .await
722 .expect("Failed to open blob");
723
724 let data = b"Hello, Storage!";
726 blob.write_at(data, 0)
727 .await
728 .expect("Failed to write to blob");
729
730 blob.sync().await.expect("Failed to sync blob");
732
733 let check1 = context.with_label("check1").spawn({
735 let blob = blob.clone();
736 move |_| async move {
737 let mut buffer = vec![0u8; data.len()];
738 blob.read_at(&mut buffer, 0)
739 .await
740 .expect("Failed to read from blob");
741 assert_eq!(&buffer, data);
742 }
743 });
744 let check2 = context.with_label("check2").spawn({
745 let blob = blob.clone();
746 move |_| async move {
747 let mut buffer = vec![0u8; data.len()];
748 blob.read_at(&mut buffer, 0)
749 .await
750 .expect("Failed to read from blob");
751 assert_eq!(&buffer, data);
752 }
753 });
754
755 let result = join!(check1, check2);
757 assert!(result.0.is_ok());
758 assert!(result.1.is_ok());
759
760 let mut buffer = vec![0u8; data.len()];
762 blob.read_at(&mut buffer, 0)
763 .await
764 .expect("Failed to read from blob");
765 assert_eq!(&buffer, data);
766
767 let length = blob.len().await.expect("Failed to get blob length");
769 assert_eq!(length, data.len() as u64);
770
771 blob.close().await.expect("Failed to close blob");
773 });
774 }
775
776 fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
777 let kill = 9;
778 runner.start(async move {
779 let before = context
781 .with_label("before")
782 .spawn(move |context| async move {
783 let sig = context.stopped().await;
784 assert_eq!(sig.unwrap(), kill);
785 });
786
787 let after = context
789 .with_label("after")
790 .spawn(move |context| async move {
791 let mut signal = context.stopped();
793 loop {
794 select! {
795 sig = &mut signal => {
796 assert_eq!(sig.unwrap(), kill);
798 break;
799 },
800 _ = context.sleep(Duration::from_millis(10)) => {
801 },
803 }
804 }
805 });
806
807 context.sleep(Duration::from_millis(50)).await;
809
810 context.stop(kill);
812
813 let result = join!(before, after);
815 assert!(result.0.is_ok());
816 assert!(result.1.is_ok());
817 });
818 }
819
820 fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
821 runner.start(async move {
822 let handle = context.spawn_ref();
823 let result = handle(async move { 42 }).await;
824 assert_eq!(result, Ok(42));
825 });
826 }
827
828 fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
829 runner.start(async move {
830 let handle = context.spawn_ref();
831 let result = handle(async move { 42 }).await;
832 assert_eq!(result, Ok(42));
833
834 let handle = context.spawn_ref();
836 let result = handle(async move { 42 }).await;
837 assert_eq!(result, Ok(42));
838 });
839 }
840
841 fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
842 runner.start(async move {
843 let handle = context.spawn_ref();
844 let result = handle(async move { 42 }).await;
845 assert_eq!(result, Ok(42));
846
847 context.spawn(|_| async move { 42 });
849 });
850 }
851
852 fn test_spawn_blocking(runner: impl Runner, context: impl Spawner) {
853 runner.start(async move {
854 let handle = context.spawn_blocking(|| 42);
855 let result = handle.await;
856 assert_eq!(result, Ok(42));
857 });
858 }
859
860 fn test_spawn_blocking_abort(runner: impl Runner, context: impl Spawner) {
861 runner.start(async move {
862 let (sender, mut receiver) = oneshot::channel();
864 let handle = context.spawn_blocking(move || {
865 loop {
867 if receiver.try_recv().is_ok() {
868 break;
869 }
870 }
871
872 let mut count = 0;
874 loop {
875 count += 1;
876 if count >= 100_000_000 {
877 break;
878 }
879 }
880 count
881 });
882
883 handle.abort();
889 sender.send(()).unwrap();
890
891 assert_eq!(handle.await, Ok(100_000_000));
893 });
894 }
895
896 fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
897 runner.start(async move {
898 assert_eq!(context.label(), "");
900
901 let counter = Counter::<u64>::default();
903 context.register("test", "test", counter.clone());
904
905 counter.inc();
907
908 let buffer = context.encode();
910 assert!(buffer.contains("test_total 1"));
911
912 let context = context.with_label("nested");
914 let nested_counter = Counter::<u64>::default();
915 context.register("test", "test", nested_counter.clone());
916
917 nested_counter.inc();
919
920 let buffer = context.encode();
922 assert!(buffer.contains("nested_test_total 1"));
923 assert!(buffer.contains("test_total 1"));
924 });
925 }
926
927 fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
928 runner.start(async move {
929 context.with_label(METRICS_PREFIX);
930 })
931 }
932
933 fn test_metrics_serve<L, Si, St>(
934 runner: impl Runner,
935 context: impl Clock + Spawner + Metrics + Network<L, Si, St>,
936 ) where
937 L: Listener<Si, St>,
938 Si: Sink,
939 St: Stream,
940 {
941 runner.start(async move {
942 let counter: Counter<u64> = Counter::default();
944 context.register("test_counter", "Test counter", counter.clone());
945 counter.inc();
946
947 let address = SocketAddr::from_str("127.0.0.1:8000").unwrap();
949
950 context
952 .with_label("server")
953 .spawn(move |context| async move {
954 metrics::server::serve(context, address).await;
955 });
956
957 async fn read_line<St: Stream>(stream: &mut St) -> Result<String, Error> {
959 let mut line = Vec::new();
960 loop {
961 let mut byte = [0; 1];
962 stream.recv(&mut byte).await?;
963 if byte[0] == b'\n' {
964 if line.last() == Some(&b'\r') {
965 line.pop(); }
967 break;
968 }
969 line.push(byte[0]);
970 }
971 String::from_utf8(line).map_err(|_| Error::ReadFailed)
972 }
973
974 async fn read_headers<St: Stream>(
975 stream: &mut St,
976 ) -> Result<HashMap<String, String>, Error> {
977 let mut headers = HashMap::new();
978 loop {
979 let line = read_line(stream).await?;
980 if line.is_empty() {
981 break;
982 }
983 let parts: Vec<&str> = line.splitn(2, ": ").collect();
984 if parts.len() == 2 {
985 headers.insert(parts[0].to_string(), parts[1].to_string());
986 }
987 }
988 Ok(headers)
989 }
990
991 async fn read_body<St: Stream>(
992 stream: &mut St,
993 content_length: usize,
994 ) -> Result<String, Error> {
995 let mut body = vec![0; content_length];
996 stream.recv(&mut body).await?;
997 String::from_utf8(body).map_err(|_| Error::ReadFailed)
998 }
999
1000 let client_handle = context
1002 .with_label("client")
1003 .spawn(move |context| async move {
1004 let (_, mut stream) = loop {
1005 match context.dial(address).await {
1006 Ok((sink, stream)) => break (sink, stream),
1007 Err(e) => {
1008 error!(err =?e, "failed to connect");
1010 context.sleep(Duration::from_millis(10)).await;
1011 }
1012 }
1013 };
1014
1015 let status_line = read_line(&mut stream).await.unwrap();
1017 assert_eq!(status_line, "HTTP/1.1 200 OK");
1018
1019 let headers = read_headers(&mut stream).await.unwrap();
1021 let content_length = headers
1022 .get("Content-Length")
1023 .unwrap()
1024 .parse::<usize>()
1025 .unwrap();
1026
1027 let body = read_body(&mut stream, content_length).await.unwrap();
1029 assert!(body.contains("test_counter_total 1"));
1030 });
1031
1032 client_handle.await.unwrap();
1034 });
1035 }
1036
1037 #[test]
1038 fn test_deterministic_future() {
1039 let (runner, _, _) = deterministic::Executor::default();
1040 test_error_future(runner);
1041 }
1042
1043 #[test]
1044 fn test_deterministic_clock_sleep() {
1045 let (executor, context, _) = deterministic::Executor::default();
1046 assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
1047 test_clock_sleep(executor, context);
1048 }
1049
1050 #[test]
1051 fn test_deterministic_clock_sleep_until() {
1052 let (executor, context, _) = deterministic::Executor::default();
1053 test_clock_sleep_until(executor, context);
1054 }
1055
1056 #[test]
1057 fn test_deterministic_root_finishes() {
1058 let (executor, context, _) = deterministic::Executor::default();
1059 test_root_finishes(executor, context);
1060 }
1061
1062 #[test]
1063 fn test_deterministic_spawn_abort() {
1064 let (executor, context, _) = deterministic::Executor::default();
1065 test_spawn_abort(executor, context);
1066 }
1067
1068 #[test]
1069 fn test_deterministic_panic_aborts_root() {
1070 let (runner, _, _) = deterministic::Executor::default();
1071 test_panic_aborts_root(runner);
1072 }
1073
1074 #[test]
1075 #[should_panic(expected = "blah")]
1076 fn test_deterministic_panic_aborts_spawn() {
1077 let (executor, context, _) = deterministic::Executor::default();
1078 test_panic_aborts_spawn(executor, context);
1079 }
1080
1081 #[test]
1082 fn test_deterministic_select() {
1083 let (executor, _, _) = deterministic::Executor::default();
1084 test_select(executor);
1085 }
1086
1087 #[test]
1088 fn test_deterministic_select_loop() {
1089 let (executor, context, _) = deterministic::Executor::default();
1090 test_select_loop(executor, context);
1091 }
1092
1093 #[test]
1094 fn test_deterministic_storage_operations() {
1095 let (executor, context, _) = deterministic::Executor::default();
1096 test_storage_operations(executor, context);
1097 }
1098
1099 #[test]
1100 fn test_deterministic_blob_read_write() {
1101 let (executor, context, _) = deterministic::Executor::default();
1102 test_blob_read_write(executor, context);
1103 }
1104
1105 #[test]
1106 fn test_deterministic_many_partition_read_write() {
1107 let (executor, context, _) = deterministic::Executor::default();
1108 test_many_partition_read_write(executor, context);
1109 }
1110
1111 #[test]
1112 fn test_deterministic_blob_read_past_length() {
1113 let (executor, context, _) = deterministic::Executor::default();
1114 test_blob_read_past_length(executor, context);
1115 }
1116
1117 #[test]
1118 fn test_deterministic_blob_clone_and_concurrent_read() {
1119 let (executor, context, _) = deterministic::Executor::default();
1121 test_blob_clone_and_concurrent_read(executor, context.clone());
1122
1123 let buffer = context.encode();
1125 assert!(buffer.contains("open_blobs 0"));
1126 }
1127
1128 #[test]
1129 fn test_deterministic_shutdown() {
1130 let (executor, context, _) = deterministic::Executor::default();
1131 test_shutdown(executor, context);
1132 }
1133
1134 #[test]
1135 fn test_deterministic_spawn_ref() {
1136 let (executor, context, _) = deterministic::Executor::default();
1137 test_spawn_ref(executor, context);
1138 }
1139
1140 #[test]
1141 #[should_panic]
1142 fn test_deterministic_spawn_ref_duplicate() {
1143 let (executor, context, _) = deterministic::Executor::default();
1144 test_spawn_ref_duplicate(executor, context);
1145 }
1146
1147 #[test]
1148 #[should_panic]
1149 fn test_deterministic_spawn_duplicate() {
1150 let (executor, context, _) = deterministic::Executor::default();
1151 test_spawn_duplicate(executor, context);
1152 }
1153
1154 #[test]
1155 fn test_deterministic_spawn_blocking() {
1156 let (executor, context, _) = deterministic::Executor::default();
1157 test_spawn_blocking(executor, context);
1158 }
1159
1160 #[test]
1161 #[should_panic(expected = "blocking task panicked")]
1162 fn test_deterministic_spawn_blocking_panic() {
1163 let (executor, context, _) = deterministic::Executor::default();
1164 executor.start(async move {
1165 let handle = context.spawn_blocking(|| {
1166 panic!("blocking task panicked");
1167 });
1168 handle.await.unwrap();
1169 });
1170 }
1171
1172 #[test]
1173 fn test_deterministic_spawn_blocking_abort() {
1174 let (executor, context, _) = deterministic::Executor::default();
1175 test_spawn_blocking_abort(executor, context);
1176 }
1177
1178 #[test]
1179 fn test_deterministic_metrics() {
1180 let (executor, context, _) = deterministic::Executor::default();
1181 test_metrics(executor, context);
1182 }
1183
1184 #[test]
1185 #[should_panic]
1186 fn test_deterministic_metrics_label() {
1187 let (executor, context, _) = deterministic::Executor::default();
1188 test_metrics_label(executor, context);
1189 }
1190
1191 #[test]
1192 fn test_deterministic_metrics_serve() {
1193 let (executor, context, _) = deterministic::Executor::default();
1194 test_metrics_serve(executor, context);
1195 }
1196
1197 #[test]
1198 fn test_tokio_error_future() {
1199 let (runner, _) = tokio::Executor::default();
1200 test_error_future(runner);
1201 }
1202
1203 #[test]
1204 fn test_tokio_clock_sleep() {
1205 let (executor, context) = tokio::Executor::default();
1206 test_clock_sleep(executor, context);
1207 }
1208
1209 #[test]
1210 fn test_tokio_clock_sleep_until() {
1211 let (executor, context) = tokio::Executor::default();
1212 test_clock_sleep_until(executor, context);
1213 }
1214
1215 #[test]
1216 fn test_tokio_root_finishes() {
1217 let (executor, context) = tokio::Executor::default();
1218 test_root_finishes(executor, context);
1219 }
1220
1221 #[test]
1222 fn test_tokio_spawn_abort() {
1223 let (executor, context) = tokio::Executor::default();
1224 test_spawn_abort(executor, context);
1225 }
1226
1227 #[test]
1228 fn test_tokio_panic_aborts_root() {
1229 let (runner, _) = tokio::Executor::default();
1230 test_panic_aborts_root(runner);
1231 }
1232
1233 #[test]
1234 fn test_tokio_panic_aborts_spawn() {
1235 let (executor, context) = tokio::Executor::default();
1236 test_panic_aborts_spawn(executor, context);
1237 }
1238
1239 #[test]
1240 fn test_tokio_select() {
1241 let (executor, _) = tokio::Executor::default();
1242 test_select(executor);
1243 }
1244
1245 #[test]
1246 fn test_tokio_select_loop() {
1247 let (executor, context) = tokio::Executor::default();
1248 test_select_loop(executor, context);
1249 }
1250
1251 #[test]
1252 fn test_tokio_storage_operations() {
1253 let (executor, context) = tokio::Executor::default();
1254 test_storage_operations(executor, context);
1255 }
1256
1257 #[test]
1258 fn test_tokio_blob_read_write() {
1259 let (executor, context) = tokio::Executor::default();
1260 test_blob_read_write(executor, context);
1261 }
1262
1263 #[test]
1264 fn test_tokio_many_partition_read_write() {
1265 let (executor, context) = tokio::Executor::default();
1266 test_many_partition_read_write(executor, context);
1267 }
1268
1269 #[test]
1270 fn test_tokio_blob_read_past_length() {
1271 let (executor, context) = tokio::Executor::default();
1272 test_blob_read_past_length(executor, context);
1273 }
1274
1275 #[test]
1276 fn test_tokio_blob_clone_and_concurrent_read() {
1277 let (executor, context) = tokio::Executor::default();
1279 test_blob_clone_and_concurrent_read(executor, context.clone());
1280
1281 let buffer = context.encode();
1283 assert!(buffer.contains("open_blobs 0"));
1284 }
1285
1286 #[test]
1287 fn test_tokio_shutdown() {
1288 let (executor, context) = tokio::Executor::default();
1289 test_shutdown(executor, context);
1290 }
1291
1292 #[test]
1293 fn test_tokio_spawn_ref() {
1294 let (executor, context) = tokio::Executor::default();
1295 test_spawn_ref(executor, context);
1296 }
1297
1298 #[test]
1299 #[should_panic]
1300 fn test_tokio_spawn_ref_duplicate() {
1301 let (executor, context) = tokio::Executor::default();
1302 test_spawn_ref_duplicate(executor, context);
1303 }
1304
1305 #[test]
1306 #[should_panic]
1307 fn test_tokio_spawn_duplicate() {
1308 let (executor, context) = tokio::Executor::default();
1309 test_spawn_duplicate(executor, context);
1310 }
1311
1312 #[test]
1313 fn test_tokio_spawn_blocking() {
1314 let (executor, context) = tokio::Executor::default();
1315 test_spawn_blocking(executor, context);
1316 }
1317
1318 #[test]
1319 fn test_tokio_spawn_blocking_panic() {
1320 let (executor, context) = tokio::Executor::default();
1321 executor.start(async move {
1322 let handle = context.spawn_blocking(|| {
1323 panic!("blocking task panicked");
1324 });
1325 let result = handle.await;
1326 assert_eq!(result, Err(Error::Exited));
1327 });
1328 }
1329
1330 #[test]
1331 fn test_tokio_spawn_blocking_abort() {
1332 let (executor, context) = tokio::Executor::default();
1333 test_spawn_blocking_abort(executor, context);
1334 }
1335
1336 #[test]
1337 fn test_tokio_metrics() {
1338 let (executor, context) = tokio::Executor::default();
1339 test_metrics(executor, context);
1340 }
1341
1342 #[test]
1343 #[should_panic]
1344 fn test_tokio_metrics_label() {
1345 let (executor, context) = tokio::Executor::default();
1346 test_metrics_label(executor, context);
1347 }
1348
1349 #[test]
1350 fn test_tokio_metrics_serve() {
1351 let (executor, context) = tokio::Executor::default();
1352 test_metrics_serve(executor, context);
1353 }
1354}