1use prometheus_client::registry::Metric;
21use std::{
22 future::Future,
23 net::SocketAddr,
24 time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31 if #[cfg(not(target_arch = "wasm32"))] {
32 pub mod tokio;
33 }
34}
35pub mod telemetry;
36mod utils;
37pub use utils::{reschedule, Handle, Signal, Signaler};
38
39const METRICS_PREFIX: &str = "runtime";
41
42#[derive(Error, Debug, PartialEq)]
44pub enum Error {
45 #[error("exited")]
46 Exited,
47 #[error("closed")]
48 Closed,
49 #[error("timeout")]
50 Timeout,
51 #[error("bind failed")]
52 BindFailed,
53 #[error("connection failed")]
54 ConnectionFailed,
55 #[error("write failed")]
56 WriteFailed,
57 #[error("read failed")]
58 ReadFailed,
59 #[error("send failed")]
60 SendFailed,
61 #[error("recv failed")]
62 RecvFailed,
63 #[error("partition creation failed: {0}")]
64 PartitionCreationFailed(String),
65 #[error("partition missing: {0}")]
66 PartitionMissing(String),
67 #[error("partition corrupt: {0}")]
68 PartitionCorrupt(String),
69 #[error("blob open failed: {0}/{1}")]
70 BlobOpenFailed(String, String),
71 #[error("blob missing: {0}/{1}")]
72 BlobMissing(String, String),
73 #[error("blob truncate failed: {0}/{1}")]
74 BlobTruncateFailed(String, String),
75 #[error("blob sync failed: {0}/{1}")]
76 BlobSyncFailed(String, String),
77 #[error("blob close failed: {0}/{1}")]
78 BlobCloseFailed(String, String),
79 #[error("blob insufficient length")]
80 BlobInsufficientLength,
81 #[error("offset overflow")]
82 OffsetOverflow,
83}
84
85pub trait Runner {
88 fn start<F>(self, f: F) -> F::Output
93 where
94 F: Future + Send + 'static,
95 F::Output: Send + 'static;
96}
97
98pub trait Spawner: Clone + Send + Sync + 'static {
100 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
108 where
109 F: FnOnce(Self) -> Fut + Send + 'static,
110 Fut: Future<Output = T> + Send + 'static,
111 T: Send + 'static;
112
113 fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
126 where
127 F: Future<Output = T> + Send + 'static,
128 T: Send + 'static;
129
130 fn stop(&self, value: i32);
137
138 fn stopped(&self) -> Signal;
143}
144
145pub trait Metrics: Clone + Send + Sync + 'static {
147 fn with_label(&self, label: &str) -> Self;
155
156 fn label(&self) -> String;
158
159 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
163
164 fn encode(&self) -> String;
166}
167
168pub trait Clock: Clone + Send + Sync + 'static {
174 fn current(&self) -> SystemTime;
176
177 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
179
180 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
182}
183
184pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
187where
188 L: Listener<Si, St>,
189 Si: Sink,
190 St: Stream,
191{
192 fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
194
195 fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
197}
198
199pub trait Listener<Si, St>: Sync + Send + 'static
202where
203 Si: Sink,
204 St: Stream,
205{
206 fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
208}
209
210pub trait Sink: Sync + Send + 'static {
213 fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
215}
216
217pub trait Stream: Sync + Send + 'static {
220 fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
223}
224
225pub trait Storage<B>: Clone + Send + Sync + 'static
233where
234 B: Blob,
235{
236 fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
241
242 fn remove(
246 &self,
247 partition: &str,
248 name: Option<&[u8]>,
249 ) -> impl Future<Output = Result<(), Error>> + Send;
250
251 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
253}
254
255#[allow(clippy::len_without_is_empty)]
266pub trait Blob: Clone + Send + Sync + 'static {
267 fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
269
270 fn read_at(
275 &self,
276 buf: &mut [u8],
277 offset: u64,
278 ) -> impl Future<Output = Result<(), Error>> + Send;
279
280 fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
282
283 fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
285
286 fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
288
289 fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use commonware_macros::select;
297 use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
298 use prometheus_client::metrics::counter::Counter;
299 use std::panic::{catch_unwind, AssertUnwindSafe};
300 use std::sync::Mutex;
301 use utils::reschedule;
302
303 fn test_error_future(runner: impl Runner) {
304 async fn error_future() -> Result<&'static str, &'static str> {
305 Err("An error occurred")
306 }
307 let result = runner.start(error_future());
308 assert_eq!(result, Err("An error occurred"));
309 }
310
311 fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
312 runner.start(async move {
313 let start = context.current();
315 let sleep_duration = Duration::from_millis(10);
316 context.sleep(sleep_duration).await;
317
318 let end = context.current();
320 assert!(end.duration_since(start).unwrap() >= sleep_duration);
321 });
322 }
323
324 fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
325 runner.start(async move {
326 let now = context.current();
328 context.sleep_until(now + Duration::from_millis(100)).await;
329
330 let elapsed = now.elapsed().unwrap();
332 assert!(elapsed >= Duration::from_millis(100));
333 });
334 }
335
336 fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
337 runner.start(async move {
338 context.spawn(|_| async move {
339 loop {
340 reschedule().await;
341 }
342 });
343 });
344 }
345
346 fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
347 runner.start(async move {
348 let handle = context.spawn(|_| async move {
349 loop {
350 reschedule().await;
351 }
352 });
353 handle.abort();
354 assert_eq!(handle.await, Err(Error::Closed));
355 });
356 }
357
358 fn test_panic_aborts_root(runner: impl Runner) {
359 let result = catch_unwind(AssertUnwindSafe(|| {
360 runner.start(async move {
361 panic!("blah");
362 });
363 }));
364 result.unwrap_err();
365 }
366
367 fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
368 let result = runner.start(async move {
369 let result = context.spawn(|_| async move {
370 panic!("blah");
371 });
372 assert_eq!(result.await, Err(Error::Exited));
373 Result::<(), Error>::Ok(())
374 });
375
376 result.unwrap();
378 }
379
380 fn test_select(runner: impl Runner) {
381 runner.start(async move {
382 let output = Mutex::new(0);
384 select! {
385 v1 = ready(1) => {
386 *output.lock().unwrap() = v1;
387 },
388 v2 = ready(2) => {
389 *output.lock().unwrap() = v2;
390 },
391 };
392 assert_eq!(*output.lock().unwrap(), 1);
393
394 select! {
396 v1 = std::future::pending::<i32>() => {
397 *output.lock().unwrap() = v1;
398 },
399 v2 = ready(2) => {
400 *output.lock().unwrap() = v2;
401 },
402 };
403 assert_eq!(*output.lock().unwrap(), 2);
404 });
405 }
406
407 fn test_select_loop(runner: impl Runner, context: impl Clock) {
409 runner.start(async move {
410 let (mut sender, mut receiver) = mpsc::unbounded();
412 for _ in 0..2 {
413 select! {
414 v = receiver.next() => {
415 panic!("unexpected value: {:?}", v);
416 },
417 _ = context.sleep(Duration::from_millis(100)) => {
418 continue;
419 },
420 };
421 }
422
423 sender.send(0).await.unwrap();
425 sender.send(1).await.unwrap();
426
427 select! {
429 _ = async {} => {
430 },
432 v = receiver.next() => {
433 panic!("unexpected value: {:?}", v);
434 },
435 };
436
437 for i in 0..2 {
439 select! {
440 _ = context.sleep(Duration::from_millis(100)) => {
441 panic!("timeout");
442 },
443 v = receiver.next() => {
444 assert_eq!(v.unwrap(), i);
445 },
446 };
447 }
448 });
449 }
450
451 fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
452 where
453 B: Blob,
454 {
455 runner.start(async move {
456 let partition = "test_partition";
457 let name = b"test_blob";
458
459 let blob = context
461 .open(partition, name)
462 .await
463 .expect("Failed to open blob");
464
465 let data = b"Hello, Storage!";
467 blob.write_at(data, 0)
468 .await
469 .expect("Failed to write to blob");
470
471 blob.sync().await.expect("Failed to sync blob");
473
474 let mut buffer = vec![0u8; data.len()];
476 blob.read_at(&mut buffer, 0)
477 .await
478 .expect("Failed to read from blob");
479 assert_eq!(&buffer, data);
480
481 let length = blob.len().await.expect("Failed to get blob length");
483 assert_eq!(length, data.len() as u64);
484
485 blob.close().await.expect("Failed to close blob");
487
488 let blobs = context
490 .scan(partition)
491 .await
492 .expect("Failed to scan partition");
493 assert!(blobs.contains(&name.to_vec()));
494
495 let blob = context
497 .open(partition, name)
498 .await
499 .expect("Failed to reopen blob");
500
501 let mut buffer = vec![0u8; 7];
503 blob.read_at(&mut buffer, 7)
504 .await
505 .expect("Failed to read data");
506 assert_eq!(&buffer, b"Storage");
507
508 blob.close().await.expect("Failed to close blob");
510
511 context
513 .remove(partition, Some(name))
514 .await
515 .expect("Failed to remove blob");
516
517 let blobs = context
519 .scan(partition)
520 .await
521 .expect("Failed to scan partition");
522 assert!(!blobs.contains(&name.to_vec()));
523
524 context
526 .remove(partition, None)
527 .await
528 .expect("Failed to remove partition");
529
530 let result = context.scan(partition).await;
532 assert!(matches!(result, Err(Error::PartitionMissing(_))));
533 });
534 }
535
536 fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
537 where
538 B: Blob,
539 {
540 runner.start(async move {
541 let partition = "test_partition";
542 let name = b"test_blob_rw";
543
544 let blob = context
546 .open(partition, name)
547 .await
548 .expect("Failed to open blob");
549
550 let data1 = b"Hello";
552 let data2 = b"World";
553 blob.write_at(data1, 0)
554 .await
555 .expect("Failed to write data1");
556 blob.write_at(data2, 5)
557 .await
558 .expect("Failed to write data2");
559
560 let length = blob.len().await.expect("Failed to get blob length");
562 assert_eq!(length, 10);
563
564 let mut buffer = vec![0u8; 10];
566 blob.read_at(&mut buffer, 0)
567 .await
568 .expect("Failed to read data");
569 assert_eq!(&buffer[..5], data1);
570 assert_eq!(&buffer[5..], data2);
571
572 let data3 = b"Store";
574 blob.write_at(data3, 5)
575 .await
576 .expect("Failed to write data3");
577 let length = blob.len().await.expect("Failed to get blob length");
578 assert_eq!(length, 10);
579
580 blob.truncate(5).await.expect("Failed to truncate blob");
582 let length = blob.len().await.expect("Failed to get blob length");
583 assert_eq!(length, 5);
584 let mut buffer = vec![0u8; 5];
585 blob.read_at(&mut buffer, 0)
586 .await
587 .expect("Failed to read data");
588 assert_eq!(&buffer[..5], data1);
589
590 let mut buffer = vec![0u8; 10];
592 let result = blob.read_at(&mut buffer, 0).await;
593 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
594
595 blob.close().await.expect("Failed to close blob");
597 });
598 }
599
600 fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
601 where
602 B: Blob,
603 {
604 runner.start(async move {
605 let partitions = ["partition1", "partition2", "partition3"];
606 let name = b"test_blob_rw";
607
608 for (additional, partition) in partitions.iter().enumerate() {
609 let blob = context
611 .open(partition, name)
612 .await
613 .expect("Failed to open blob");
614
615 let data1 = b"Hello";
617 let data2 = b"World";
618 blob.write_at(data1, 0)
619 .await
620 .expect("Failed to write data1");
621 blob.write_at(data2, 5 + additional as u64)
622 .await
623 .expect("Failed to write data2");
624
625 blob.close().await.expect("Failed to close blob");
627 }
628
629 for (additional, partition) in partitions.iter().enumerate() {
630 let blob = context
632 .open(partition, name)
633 .await
634 .expect("Failed to open blob");
635
636 let mut buffer = vec![0u8; 10 + additional];
638 blob.read_at(&mut buffer, 0)
639 .await
640 .expect("Failed to read data");
641 assert_eq!(&buffer[..5], b"Hello");
642 assert_eq!(&buffer[5 + additional..], b"World");
643
644 blob.close().await.expect("Failed to close blob");
646 }
647 });
648 }
649
650 fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
651 where
652 B: Blob,
653 {
654 runner.start(async move {
655 let partition = "test_partition";
656 let name = b"test_blob_rw";
657
658 let blob = context
660 .open(partition, name)
661 .await
662 .expect("Failed to open blob");
663
664 let mut buffer = vec![0u8; 10];
666 let result = blob.read_at(&mut buffer, 0).await;
667 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
668
669 let data = b"Hello, Storage!";
671 blob.write_at(data, 0)
672 .await
673 .expect("Failed to write to blob");
674
675 let mut buffer = vec![0u8; 20];
677 let result = blob.read_at(&mut buffer, 0).await;
678 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
679 })
680 }
681
682 fn test_blob_clone_and_concurrent_read<B>(
683 runner: impl Runner,
684 context: impl Spawner + Storage<B> + Metrics,
685 ) where
686 B: Blob,
687 {
688 runner.start(async move {
689 let partition = "test_partition";
690 let name = b"test_blob_rw";
691
692 let blob = context
694 .open(partition, name)
695 .await
696 .expect("Failed to open blob");
697
698 let data = b"Hello, Storage!";
700 blob.write_at(data, 0)
701 .await
702 .expect("Failed to write to blob");
703
704 blob.sync().await.expect("Failed to sync blob");
706
707 let check1 = context.with_label("check1").spawn({
709 let blob = blob.clone();
710 move |_| async move {
711 let mut buffer = vec![0u8; data.len()];
712 blob.read_at(&mut buffer, 0)
713 .await
714 .expect("Failed to read from blob");
715 assert_eq!(&buffer, data);
716 }
717 });
718 let check2 = context.with_label("check2").spawn({
719 let blob = blob.clone();
720 move |_| async move {
721 let mut buffer = vec![0u8; data.len()];
722 blob.read_at(&mut buffer, 0)
723 .await
724 .expect("Failed to read from blob");
725 assert_eq!(&buffer, data);
726 }
727 });
728
729 let result = join!(check1, check2);
731 assert!(result.0.is_ok());
732 assert!(result.1.is_ok());
733
734 let mut buffer = vec![0u8; data.len()];
736 blob.read_at(&mut buffer, 0)
737 .await
738 .expect("Failed to read from blob");
739 assert_eq!(&buffer, data);
740
741 let length = blob.len().await.expect("Failed to get blob length");
743 assert_eq!(length, data.len() as u64);
744
745 blob.close().await.expect("Failed to close blob");
747 });
748 }
749
750 fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
751 let kill = 9;
752 runner.start(async move {
753 let before = context
755 .with_label("before")
756 .spawn(move |context| async move {
757 let sig = context.stopped().await;
758 assert_eq!(sig.unwrap(), kill);
759 });
760
761 let after = context
763 .with_label("after")
764 .spawn(move |context| async move {
765 let mut signal = context.stopped();
767 loop {
768 select! {
769 sig = &mut signal => {
770 assert_eq!(sig.unwrap(), kill);
772 break;
773 },
774 _ = context.sleep(Duration::from_millis(10)) => {
775 },
777 }
778 }
779 });
780
781 context.sleep(Duration::from_millis(50)).await;
783
784 context.stop(kill);
786
787 let result = join!(before, after);
789 assert!(result.0.is_ok());
790 assert!(result.1.is_ok());
791 });
792 }
793
794 fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
795 runner.start(async move {
796 let handle = context.spawn_ref();
797 let result = handle(async move { 42 }).await;
798 assert_eq!(result, Ok(42));
799 });
800 }
801
802 fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
803 runner.start(async move {
804 let handle = context.spawn_ref();
805 let result = handle(async move { 42 }).await;
806 assert_eq!(result, Ok(42));
807
808 let handle = context.spawn_ref();
810 let result = handle(async move { 42 }).await;
811 assert_eq!(result, Ok(42));
812 });
813 }
814
815 fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
816 runner.start(async move {
817 let handle = context.spawn_ref();
818 let result = handle(async move { 42 }).await;
819 assert_eq!(result, Ok(42));
820
821 context.spawn(|_| async move { 42 });
823 });
824 }
825
826 fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
827 runner.start(async move {
828 assert_eq!(context.label(), "");
830
831 let counter = Counter::<u64>::default();
833 context.register("test", "test", counter.clone());
834
835 counter.inc();
837
838 let buffer = context.encode();
840 assert!(buffer.contains("test_total 1"));
841
842 let context = context.with_label("nested");
844 let nested_counter = Counter::<u64>::default();
845 context.register("test", "test", nested_counter.clone());
846
847 nested_counter.inc();
849
850 let buffer = context.encode();
852 assert!(buffer.contains("nested_test_total 1"));
853 assert!(buffer.contains("test_total 1"));
854 });
855 }
856
857 fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
858 runner.start(async move {
859 context.with_label(METRICS_PREFIX);
860 })
861 }
862
863 #[test]
864 fn test_deterministic_future() {
865 let (runner, _, _) = deterministic::Executor::default();
866 test_error_future(runner);
867 }
868
869 #[test]
870 fn test_deterministic_clock_sleep() {
871 let (executor, context, _) = deterministic::Executor::default();
872 assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
873 test_clock_sleep(executor, context);
874 }
875
876 #[test]
877 fn test_deterministic_clock_sleep_until() {
878 let (executor, context, _) = deterministic::Executor::default();
879 test_clock_sleep_until(executor, context);
880 }
881
882 #[test]
883 fn test_deterministic_root_finishes() {
884 let (executor, context, _) = deterministic::Executor::default();
885 test_root_finishes(executor, context);
886 }
887
888 #[test]
889 fn test_deterministic_spawn_abort() {
890 let (executor, context, _) = deterministic::Executor::default();
891 test_spawn_abort(executor, context);
892 }
893
894 #[test]
895 fn test_deterministic_panic_aborts_root() {
896 let (runner, _, _) = deterministic::Executor::default();
897 test_panic_aborts_root(runner);
898 }
899
900 #[test]
901 #[should_panic(expected = "blah")]
902 fn test_deterministic_panic_aborts_spawn() {
903 let (executor, context, _) = deterministic::Executor::default();
904 test_panic_aborts_spawn(executor, context);
905 }
906
907 #[test]
908 fn test_deterministic_select() {
909 let (executor, _, _) = deterministic::Executor::default();
910 test_select(executor);
911 }
912
913 #[test]
914 fn test_deterministic_select_loop() {
915 let (executor, context, _) = deterministic::Executor::default();
916 test_select_loop(executor, context);
917 }
918
919 #[test]
920 fn test_deterministic_storage_operations() {
921 let (executor, context, _) = deterministic::Executor::default();
922 test_storage_operations(executor, context);
923 }
924
925 #[test]
926 fn test_deterministic_blob_read_write() {
927 let (executor, context, _) = deterministic::Executor::default();
928 test_blob_read_write(executor, context);
929 }
930
931 #[test]
932 fn test_deterministic_many_partition_read_write() {
933 let (executor, context, _) = deterministic::Executor::default();
934 test_many_partition_read_write(executor, context);
935 }
936
937 #[test]
938 fn test_deterministic_blob_read_past_length() {
939 let (executor, context, _) = deterministic::Executor::default();
940 test_blob_read_past_length(executor, context);
941 }
942
943 #[test]
944 fn test_deterministic_blob_clone_and_concurrent_read() {
945 let (executor, context, _) = deterministic::Executor::default();
947 test_blob_clone_and_concurrent_read(executor, context.clone());
948
949 let buffer = context.encode();
951 assert!(buffer.contains("open_blobs 0"));
952 }
953
954 #[test]
955 fn test_deterministic_shutdown() {
956 let (executor, context, _) = deterministic::Executor::default();
957 test_shutdown(executor, context);
958 }
959
960 #[test]
961 fn test_deterministic_spawn_ref() {
962 let (executor, context, _) = deterministic::Executor::default();
963 test_spawn_ref(executor, context);
964 }
965
966 #[test]
967 #[should_panic]
968 fn test_deterministic_spawn_ref_duplicate() {
969 let (executor, context, _) = deterministic::Executor::default();
970 test_spawn_ref_duplicate(executor, context);
971 }
972
973 #[test]
974 #[should_panic]
975 fn test_deterministic_spawn_duplicate() {
976 let (executor, context, _) = deterministic::Executor::default();
977 test_spawn_duplicate(executor, context);
978 }
979
980 #[test]
981 fn test_deterministic_metrics() {
982 let (executor, context, _) = deterministic::Executor::default();
983 test_metrics(executor, context);
984 }
985
986 #[test]
987 #[should_panic]
988 fn test_deterministic_metrics_label() {
989 let (executor, context, _) = deterministic::Executor::default();
990 test_metrics_label(executor, context);
991 }
992
993 #[test]
994 fn test_tokio_error_future() {
995 let (runner, _) = tokio::Executor::default();
996 test_error_future(runner);
997 }
998
999 #[test]
1000 fn test_tokio_clock_sleep() {
1001 let (executor, context) = tokio::Executor::default();
1002 test_clock_sleep(executor, context);
1003 }
1004
1005 #[test]
1006 fn test_tokio_clock_sleep_until() {
1007 let (executor, context) = tokio::Executor::default();
1008 test_clock_sleep_until(executor, context);
1009 }
1010
1011 #[test]
1012 fn test_tokio_root_finishes() {
1013 let (executor, context) = tokio::Executor::default();
1014 test_root_finishes(executor, context);
1015 }
1016
1017 #[test]
1018 fn test_tokio_spawn_abort() {
1019 let (executor, context) = tokio::Executor::default();
1020 test_spawn_abort(executor, context);
1021 }
1022
1023 #[test]
1024 fn test_tokio_panic_aborts_root() {
1025 let (runner, _) = tokio::Executor::default();
1026 test_panic_aborts_root(runner);
1027 }
1028
1029 #[test]
1030 fn test_tokio_panic_aborts_spawn() {
1031 let (executor, context) = tokio::Executor::default();
1032 test_panic_aborts_spawn(executor, context);
1033 }
1034
1035 #[test]
1036 fn test_tokio_select() {
1037 let (executor, _) = tokio::Executor::default();
1038 test_select(executor);
1039 }
1040
1041 #[test]
1042 fn test_tokio_select_loop() {
1043 let (executor, context) = tokio::Executor::default();
1044 test_select_loop(executor, context);
1045 }
1046
1047 #[test]
1048 fn test_tokio_storage_operations() {
1049 let (executor, context) = tokio::Executor::default();
1050 test_storage_operations(executor, context);
1051 }
1052
1053 #[test]
1054 fn test_tokio_blob_read_write() {
1055 let (executor, context) = tokio::Executor::default();
1056 test_blob_read_write(executor, context);
1057 }
1058
1059 #[test]
1060 fn test_tokio_many_partition_read_write() {
1061 let (executor, context) = tokio::Executor::default();
1062 test_many_partition_read_write(executor, context);
1063 }
1064
1065 #[test]
1066 fn test_tokio_blob_read_past_length() {
1067 let (executor, context) = tokio::Executor::default();
1068 test_blob_read_past_length(executor, context);
1069 }
1070
1071 #[test]
1072 fn test_tokio_blob_clone_and_concurrent_read() {
1073 let (executor, context) = tokio::Executor::default();
1075 test_blob_clone_and_concurrent_read(executor, context.clone());
1076
1077 let buffer = context.encode();
1079 assert!(buffer.contains("open_blobs 0"));
1080 }
1081
1082 #[test]
1083 fn test_tokio_shutdown() {
1084 let (executor, context) = tokio::Executor::default();
1085 test_shutdown(executor, context);
1086 }
1087
1088 #[test]
1089 fn test_tokio_spawn_ref() {
1090 let (executor, context) = tokio::Executor::default();
1091 test_spawn_ref(executor, context);
1092 }
1093
1094 #[test]
1095 #[should_panic]
1096 fn test_tokio_spawn_ref_duplicate() {
1097 let (executor, context) = tokio::Executor::default();
1098 test_spawn_ref_duplicate(executor, context);
1099 }
1100
1101 #[test]
1102 #[should_panic]
1103 fn test_tokio_spawn_duplicate() {
1104 let (executor, context) = tokio::Executor::default();
1105 test_spawn_duplicate(executor, context);
1106 }
1107
1108 #[test]
1109 fn test_tokio_metrics() {
1110 let (executor, context) = tokio::Executor::default();
1111 test_metrics(executor, context);
1112 }
1113
1114 #[test]
1115 #[should_panic]
1116 fn test_tokio_metrics_label() {
1117 let (executor, context) = tokio::Executor::default();
1118 test_metrics_label(executor, context);
1119 }
1120}