1use prometheus_client::registry::Metric;
21use std::{
22    future::Future,
23    net::SocketAddr,
24    time::{Duration, SystemTime},
25};
26use thiserror::Error;
27
28pub mod deterministic;
29pub mod mocks;
30cfg_if::cfg_if! {
31    if #[cfg(not(target_arch = "wasm32"))] {
32        pub mod tokio;
33    }
34}
35pub mod telemetry;
36mod utils;
37pub use utils::{reschedule, Handle, Signal, Signaler};
38
39const METRICS_PREFIX: &str = "runtime";
41
42#[derive(Error, Debug, PartialEq)]
44pub enum Error {
45    #[error("exited")]
46    Exited,
47    #[error("closed")]
48    Closed,
49    #[error("timeout")]
50    Timeout,
51    #[error("bind failed")]
52    BindFailed,
53    #[error("connection failed")]
54    ConnectionFailed,
55    #[error("write failed")]
56    WriteFailed,
57    #[error("read failed")]
58    ReadFailed,
59    #[error("send failed")]
60    SendFailed,
61    #[error("recv failed")]
62    RecvFailed,
63    #[error("partition creation failed: {0}")]
64    PartitionCreationFailed(String),
65    #[error("partition missing: {0}")]
66    PartitionMissing(String),
67    #[error("partition corrupt: {0}")]
68    PartitionCorrupt(String),
69    #[error("blob open failed: {0}/{1}")]
70    BlobOpenFailed(String, String),
71    #[error("blob missing: {0}/{1}")]
72    BlobMissing(String, String),
73    #[error("blob truncate failed: {0}/{1}")]
74    BlobTruncateFailed(String, String),
75    #[error("blob sync failed: {0}/{1}")]
76    BlobSyncFailed(String, String),
77    #[error("blob close failed: {0}/{1}")]
78    BlobCloseFailed(String, String),
79    #[error("blob insufficient length")]
80    BlobInsufficientLength,
81    #[error("offset overflow")]
82    OffsetOverflow,
83}
84
85pub trait Runner {
88    fn start<F>(self, f: F) -> F::Output
93    where
94        F: Future + Send + 'static,
95        F::Output: Send + 'static;
96}
97
98pub trait Spawner: Clone + Send + Sync + 'static {
100    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
108    where
109        F: FnOnce(Self) -> Fut + Send + 'static,
110        Fut: Future<Output = T> + Send + 'static,
111        T: Send + 'static;
112
113    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
126    where
127        F: Future<Output = T> + Send + 'static,
128        T: Send + 'static;
129
130    fn stop(&self, value: i32);
137
138    fn stopped(&self) -> Signal;
143}
144
145pub trait Metrics: Clone + Send + Sync + 'static {
147    fn with_label(&self, label: &str) -> Self;
155
156    fn label(&self) -> String;
158
159    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric);
163
164    fn encode(&self) -> String;
166}
167
168pub trait Clock: Clone + Send + Sync + 'static {
174    fn current(&self) -> SystemTime;
176
177    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
179
180    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static;
182}
183
184pub trait Network<L, Si, St>: Clone + Send + Sync + 'static
187where
188    L: Listener<Si, St>,
189    Si: Sink,
190    St: Stream,
191{
192    fn bind(&self, socket: SocketAddr) -> impl Future<Output = Result<L, Error>> + Send;
194
195    fn dial(&self, socket: SocketAddr) -> impl Future<Output = Result<(Si, St), Error>> + Send;
197}
198
199pub trait Listener<Si, St>: Sync + Send + 'static
202where
203    Si: Sink,
204    St: Stream,
205{
206    fn accept(&mut self) -> impl Future<Output = Result<(SocketAddr, Si, St), Error>> + Send;
208}
209
210pub trait Sink: Sync + Send + 'static {
213    fn send(&mut self, msg: &[u8]) -> impl Future<Output = Result<(), Error>> + Send;
215}
216
217pub trait Stream: Sync + Send + 'static {
220    fn recv(&mut self, buf: &mut [u8]) -> impl Future<Output = Result<(), Error>> + Send;
223}
224
225pub trait Storage<B>: Clone + Send + Sync + 'static
233where
234    B: Blob,
235{
236    fn open(&self, partition: &str, name: &[u8]) -> impl Future<Output = Result<B, Error>> + Send;
241
242    fn remove(
246        &self,
247        partition: &str,
248        name: Option<&[u8]>,
249    ) -> impl Future<Output = Result<(), Error>> + Send;
250
251    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send;
253}
254
255#[allow(clippy::len_without_is_empty)]
266pub trait Blob: Clone + Send + Sync + 'static {
267    fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
269
270    fn read_at(
275        &self,
276        buf: &mut [u8],
277        offset: u64,
278    ) -> impl Future<Output = Result<(), Error>> + Send;
279
280    fn write_at(&self, buf: &[u8], offset: u64) -> impl Future<Output = Result<(), Error>> + Send;
282
283    fn truncate(&self, len: u64) -> impl Future<Output = Result<(), Error>> + Send;
285
286    fn sync(&self) -> impl Future<Output = Result<(), Error>> + Send;
288
289    fn close(self) -> impl Future<Output = Result<(), Error>> + Send;
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use commonware_macros::select;
297    use futures::{channel::mpsc, future::ready, join, SinkExt, StreamExt};
298    use prometheus_client::metrics::counter::Counter;
299    use std::panic::{catch_unwind, AssertUnwindSafe};
300    use std::sync::Mutex;
301    use utils::reschedule;
302
303    fn test_error_future(runner: impl Runner) {
304        async fn error_future() -> Result<&'static str, &'static str> {
305            Err("An error occurred")
306        }
307        let result = runner.start(error_future());
308        assert_eq!(result, Err("An error occurred"));
309    }
310
311    fn test_clock_sleep(runner: impl Runner, context: impl Spawner + Clock) {
312        runner.start(async move {
313            let start = context.current();
315            let sleep_duration = Duration::from_millis(10);
316            context.sleep(sleep_duration).await;
317
318            let end = context.current();
320            assert!(end.duration_since(start).unwrap() >= sleep_duration);
321        });
322    }
323
324    fn test_clock_sleep_until(runner: impl Runner, context: impl Spawner + Clock) {
325        runner.start(async move {
326            let now = context.current();
328            context.sleep_until(now + Duration::from_millis(100)).await;
329
330            let elapsed = now.elapsed().unwrap();
332            assert!(elapsed >= Duration::from_millis(100));
333        });
334    }
335
336    fn test_root_finishes(runner: impl Runner, context: impl Spawner) {
337        runner.start(async move {
338            context.spawn(|_| async move {
339                loop {
340                    reschedule().await;
341                }
342            });
343        });
344    }
345
346    fn test_spawn_abort(runner: impl Runner, context: impl Spawner) {
347        runner.start(async move {
348            let handle = context.spawn(|_| async move {
349                loop {
350                    reschedule().await;
351                }
352            });
353            handle.abort();
354            assert_eq!(handle.await, Err(Error::Closed));
355        });
356    }
357
358    fn test_panic_aborts_root(runner: impl Runner) {
359        let result = catch_unwind(AssertUnwindSafe(|| {
360            runner.start(async move {
361                panic!("blah");
362            });
363        }));
364        result.unwrap_err();
365    }
366
367    fn test_panic_aborts_spawn(runner: impl Runner, context: impl Spawner) {
368        let result = runner.start(async move {
369            let result = context.spawn(|_| async move {
370                panic!("blah");
371            });
372            assert_eq!(result.await, Err(Error::Exited));
373            Result::<(), Error>::Ok(())
374        });
375
376        result.unwrap();
378    }
379
380    fn test_select(runner: impl Runner) {
381        runner.start(async move {
382            let output = Mutex::new(0);
384            select! {
385                v1 = ready(1) => {
386                    *output.lock().unwrap() = v1;
387                },
388                v2 = ready(2) => {
389                    *output.lock().unwrap() = v2;
390                },
391            };
392            assert_eq!(*output.lock().unwrap(), 1);
393
394            select! {
396                v1 = std::future::pending::<i32>() => {
397                    *output.lock().unwrap() = v1;
398                },
399                v2 = ready(2) => {
400                    *output.lock().unwrap() = v2;
401                },
402            };
403            assert_eq!(*output.lock().unwrap(), 2);
404        });
405    }
406
407    fn test_select_loop(runner: impl Runner, context: impl Clock) {
409        runner.start(async move {
410            let (mut sender, mut receiver) = mpsc::unbounded();
412            for _ in 0..2 {
413                select! {
414                    v = receiver.next() => {
415                        panic!("unexpected value: {:?}", v);
416                    },
417                    _ = context.sleep(Duration::from_millis(100)) => {
418                        continue;
419                    },
420                };
421            }
422
423            sender.send(0).await.unwrap();
425            sender.send(1).await.unwrap();
426
427            select! {
429                _ = async {} => {
430                    },
432                v = receiver.next() => {
433                    panic!("unexpected value: {:?}", v);
434                },
435            };
436
437            for i in 0..2 {
439                select! {
440                    _ = context.sleep(Duration::from_millis(100)) => {
441                        panic!("timeout");
442                    },
443                    v = receiver.next() => {
444                        assert_eq!(v.unwrap(), i);
445                    },
446                };
447            }
448        });
449    }
450
451    fn test_storage_operations<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
452    where
453        B: Blob,
454    {
455        runner.start(async move {
456            let partition = "test_partition";
457            let name = b"test_blob";
458
459            let blob = context
461                .open(partition, name)
462                .await
463                .expect("Failed to open blob");
464
465            let data = b"Hello, Storage!";
467            blob.write_at(data, 0)
468                .await
469                .expect("Failed to write to blob");
470
471            blob.sync().await.expect("Failed to sync blob");
473
474            let mut buffer = vec![0u8; data.len()];
476            blob.read_at(&mut buffer, 0)
477                .await
478                .expect("Failed to read from blob");
479            assert_eq!(&buffer, data);
480
481            let length = blob.len().await.expect("Failed to get blob length");
483            assert_eq!(length, data.len() as u64);
484
485            blob.close().await.expect("Failed to close blob");
487
488            let blobs = context
490                .scan(partition)
491                .await
492                .expect("Failed to scan partition");
493            assert!(blobs.contains(&name.to_vec()));
494
495            let blob = context
497                .open(partition, name)
498                .await
499                .expect("Failed to reopen blob");
500
501            let mut buffer = vec![0u8; 7];
503            blob.read_at(&mut buffer, 7)
504                .await
505                .expect("Failed to read data");
506            assert_eq!(&buffer, b"Storage");
507
508            blob.close().await.expect("Failed to close blob");
510
511            context
513                .remove(partition, Some(name))
514                .await
515                .expect("Failed to remove blob");
516
517            let blobs = context
519                .scan(partition)
520                .await
521                .expect("Failed to scan partition");
522            assert!(!blobs.contains(&name.to_vec()));
523
524            context
526                .remove(partition, None)
527                .await
528                .expect("Failed to remove partition");
529
530            let result = context.scan(partition).await;
532            assert!(matches!(result, Err(Error::PartitionMissing(_))));
533        });
534    }
535
536    fn test_blob_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
537    where
538        B: Blob,
539    {
540        runner.start(async move {
541            let partition = "test_partition";
542            let name = b"test_blob_rw";
543
544            let blob = context
546                .open(partition, name)
547                .await
548                .expect("Failed to open blob");
549
550            let data1 = b"Hello";
552            let data2 = b"World";
553            blob.write_at(data1, 0)
554                .await
555                .expect("Failed to write data1");
556            blob.write_at(data2, 5)
557                .await
558                .expect("Failed to write data2");
559
560            let length = blob.len().await.expect("Failed to get blob length");
562            assert_eq!(length, 10);
563
564            let mut buffer = vec![0u8; 10];
566            blob.read_at(&mut buffer, 0)
567                .await
568                .expect("Failed to read data");
569            assert_eq!(&buffer[..5], data1);
570            assert_eq!(&buffer[5..], data2);
571
572            let data3 = b"Store";
574            blob.write_at(data3, 5)
575                .await
576                .expect("Failed to write data3");
577            let length = blob.len().await.expect("Failed to get blob length");
578            assert_eq!(length, 10);
579
580            blob.truncate(5).await.expect("Failed to truncate blob");
582            let length = blob.len().await.expect("Failed to get blob length");
583            assert_eq!(length, 5);
584            let mut buffer = vec![0u8; 5];
585            blob.read_at(&mut buffer, 0)
586                .await
587                .expect("Failed to read data");
588            assert_eq!(&buffer[..5], data1);
589
590            let mut buffer = vec![0u8; 10];
592            let result = blob.read_at(&mut buffer, 0).await;
593            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
594
595            blob.close().await.expect("Failed to close blob");
597        });
598    }
599
600    fn test_many_partition_read_write<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
601    where
602        B: Blob,
603    {
604        runner.start(async move {
605            let partitions = ["partition1", "partition2", "partition3"];
606            let name = b"test_blob_rw";
607
608            for (additional, partition) in partitions.iter().enumerate() {
609                let blob = context
611                    .open(partition, name)
612                    .await
613                    .expect("Failed to open blob");
614
615                let data1 = b"Hello";
617                let data2 = b"World";
618                blob.write_at(data1, 0)
619                    .await
620                    .expect("Failed to write data1");
621                blob.write_at(data2, 5 + additional as u64)
622                    .await
623                    .expect("Failed to write data2");
624
625                blob.close().await.expect("Failed to close blob");
627            }
628
629            for (additional, partition) in partitions.iter().enumerate() {
630                let blob = context
632                    .open(partition, name)
633                    .await
634                    .expect("Failed to open blob");
635
636                let mut buffer = vec![0u8; 10 + additional];
638                blob.read_at(&mut buffer, 0)
639                    .await
640                    .expect("Failed to read data");
641                assert_eq!(&buffer[..5], b"Hello");
642                assert_eq!(&buffer[5 + additional..], b"World");
643
644                blob.close().await.expect("Failed to close blob");
646            }
647        });
648    }
649
650    fn test_blob_read_past_length<B>(runner: impl Runner, context: impl Spawner + Storage<B>)
651    where
652        B: Blob,
653    {
654        runner.start(async move {
655            let partition = "test_partition";
656            let name = b"test_blob_rw";
657
658            let blob = context
660                .open(partition, name)
661                .await
662                .expect("Failed to open blob");
663
664            let mut buffer = vec![0u8; 10];
666            let result = blob.read_at(&mut buffer, 0).await;
667            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
668
669            let data = b"Hello, Storage!";
671            blob.write_at(data, 0)
672                .await
673                .expect("Failed to write to blob");
674
675            let mut buffer = vec![0u8; 20];
677            let result = blob.read_at(&mut buffer, 0).await;
678            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
679        })
680    }
681
682    fn test_blob_clone_and_concurrent_read<B>(
683        runner: impl Runner,
684        context: impl Spawner + Storage<B> + Metrics,
685    ) where
686        B: Blob,
687    {
688        runner.start(async move {
689            let partition = "test_partition";
690            let name = b"test_blob_rw";
691
692            let blob = context
694                .open(partition, name)
695                .await
696                .expect("Failed to open blob");
697
698            let data = b"Hello, Storage!";
700            blob.write_at(data, 0)
701                .await
702                .expect("Failed to write to blob");
703
704            blob.sync().await.expect("Failed to sync blob");
706
707            let check1 = context.with_label("check1").spawn({
709                let blob = blob.clone();
710                move |_| async move {
711                    let mut buffer = vec![0u8; data.len()];
712                    blob.read_at(&mut buffer, 0)
713                        .await
714                        .expect("Failed to read from blob");
715                    assert_eq!(&buffer, data);
716                }
717            });
718            let check2 = context.with_label("check2").spawn({
719                let blob = blob.clone();
720                move |_| async move {
721                    let mut buffer = vec![0u8; data.len()];
722                    blob.read_at(&mut buffer, 0)
723                        .await
724                        .expect("Failed to read from blob");
725                    assert_eq!(&buffer, data);
726                }
727            });
728
729            let result = join!(check1, check2);
731            assert!(result.0.is_ok());
732            assert!(result.1.is_ok());
733
734            let mut buffer = vec![0u8; data.len()];
736            blob.read_at(&mut buffer, 0)
737                .await
738                .expect("Failed to read from blob");
739            assert_eq!(&buffer, data);
740
741            let length = blob.len().await.expect("Failed to get blob length");
743            assert_eq!(length, data.len() as u64);
744
745            blob.close().await.expect("Failed to close blob");
747        });
748    }
749
750    fn test_shutdown(runner: impl Runner, context: impl Spawner + Clock + Metrics) {
751        let kill = 9;
752        runner.start(async move {
753            let before = context
755                .with_label("before")
756                .spawn(move |context| async move {
757                    let sig = context.stopped().await;
758                    assert_eq!(sig.unwrap(), kill);
759                });
760
761            let after = context
763                .with_label("after")
764                .spawn(move |context| async move {
765                    let mut signal = context.stopped();
767                    loop {
768                        select! {
769                            sig = &mut signal => {
770                                assert_eq!(sig.unwrap(), kill);
772                                break;
773                            },
774                            _ = context.sleep(Duration::from_millis(10)) => {
775                                },
777                        }
778                    }
779                });
780
781            context.sleep(Duration::from_millis(50)).await;
783
784            context.stop(kill);
786
787            let result = join!(before, after);
789            assert!(result.0.is_ok());
790            assert!(result.1.is_ok());
791        });
792    }
793
794    fn test_spawn_ref(runner: impl Runner, mut context: impl Spawner) {
795        runner.start(async move {
796            let handle = context.spawn_ref();
797            let result = handle(async move { 42 }).await;
798            assert_eq!(result, Ok(42));
799        });
800    }
801
802    fn test_spawn_ref_duplicate(runner: impl Runner, mut context: impl Spawner) {
803        runner.start(async move {
804            let handle = context.spawn_ref();
805            let result = handle(async move { 42 }).await;
806            assert_eq!(result, Ok(42));
807
808            let handle = context.spawn_ref();
810            let result = handle(async move { 42 }).await;
811            assert_eq!(result, Ok(42));
812        });
813    }
814
815    fn test_spawn_duplicate(runner: impl Runner, mut context: impl Spawner) {
816        runner.start(async move {
817            let handle = context.spawn_ref();
818            let result = handle(async move { 42 }).await;
819            assert_eq!(result, Ok(42));
820
821            context.spawn(|_| async move { 42 });
823        });
824    }
825
826    fn test_metrics(runner: impl Runner, context: impl Spawner + Metrics) {
827        runner.start(async move {
828            assert_eq!(context.label(), "");
830
831            let counter = Counter::<u64>::default();
833            context.register("test", "test", counter.clone());
834
835            counter.inc();
837
838            let buffer = context.encode();
840            assert!(buffer.contains("test_total 1"));
841
842            let context = context.with_label("nested");
844            let nested_counter = Counter::<u64>::default();
845            context.register("test", "test", nested_counter.clone());
846
847            nested_counter.inc();
849
850            let buffer = context.encode();
852            assert!(buffer.contains("nested_test_total 1"));
853            assert!(buffer.contains("test_total 1"));
854        });
855    }
856
857    fn test_metrics_label(runner: impl Runner, context: impl Spawner + Metrics) {
858        runner.start(async move {
859            context.with_label(METRICS_PREFIX);
860        })
861    }
862
863    #[test]
864    fn test_deterministic_future() {
865        let (runner, _, _) = deterministic::Executor::default();
866        test_error_future(runner);
867    }
868
869    #[test]
870    fn test_deterministic_clock_sleep() {
871        let (executor, context, _) = deterministic::Executor::default();
872        assert_eq!(context.current(), SystemTime::UNIX_EPOCH);
873        test_clock_sleep(executor, context);
874    }
875
876    #[test]
877    fn test_deterministic_clock_sleep_until() {
878        let (executor, context, _) = deterministic::Executor::default();
879        test_clock_sleep_until(executor, context);
880    }
881
882    #[test]
883    fn test_deterministic_root_finishes() {
884        let (executor, context, _) = deterministic::Executor::default();
885        test_root_finishes(executor, context);
886    }
887
888    #[test]
889    fn test_deterministic_spawn_abort() {
890        let (executor, context, _) = deterministic::Executor::default();
891        test_spawn_abort(executor, context);
892    }
893
894    #[test]
895    fn test_deterministic_panic_aborts_root() {
896        let (runner, _, _) = deterministic::Executor::default();
897        test_panic_aborts_root(runner);
898    }
899
900    #[test]
901    #[should_panic(expected = "blah")]
902    fn test_deterministic_panic_aborts_spawn() {
903        let (executor, context, _) = deterministic::Executor::default();
904        test_panic_aborts_spawn(executor, context);
905    }
906
907    #[test]
908    fn test_deterministic_select() {
909        let (executor, _, _) = deterministic::Executor::default();
910        test_select(executor);
911    }
912
913    #[test]
914    fn test_deterministic_select_loop() {
915        let (executor, context, _) = deterministic::Executor::default();
916        test_select_loop(executor, context);
917    }
918
919    #[test]
920    fn test_deterministic_storage_operations() {
921        let (executor, context, _) = deterministic::Executor::default();
922        test_storage_operations(executor, context);
923    }
924
925    #[test]
926    fn test_deterministic_blob_read_write() {
927        let (executor, context, _) = deterministic::Executor::default();
928        test_blob_read_write(executor, context);
929    }
930
931    #[test]
932    fn test_deterministic_many_partition_read_write() {
933        let (executor, context, _) = deterministic::Executor::default();
934        test_many_partition_read_write(executor, context);
935    }
936
937    #[test]
938    fn test_deterministic_blob_read_past_length() {
939        let (executor, context, _) = deterministic::Executor::default();
940        test_blob_read_past_length(executor, context);
941    }
942
943    #[test]
944    fn test_deterministic_blob_clone_and_concurrent_read() {
945        let (executor, context, _) = deterministic::Executor::default();
947        test_blob_clone_and_concurrent_read(executor, context.clone());
948
949        let buffer = context.encode();
951        assert!(buffer.contains("open_blobs 0"));
952    }
953
954    #[test]
955    fn test_deterministic_shutdown() {
956        let (executor, context, _) = deterministic::Executor::default();
957        test_shutdown(executor, context);
958    }
959
960    #[test]
961    fn test_deterministic_spawn_ref() {
962        let (executor, context, _) = deterministic::Executor::default();
963        test_spawn_ref(executor, context);
964    }
965
966    #[test]
967    #[should_panic]
968    fn test_deterministic_spawn_ref_duplicate() {
969        let (executor, context, _) = deterministic::Executor::default();
970        test_spawn_ref_duplicate(executor, context);
971    }
972
973    #[test]
974    #[should_panic]
975    fn test_deterministic_spawn_duplicate() {
976        let (executor, context, _) = deterministic::Executor::default();
977        test_spawn_duplicate(executor, context);
978    }
979
980    #[test]
981    fn test_deterministic_metrics() {
982        let (executor, context, _) = deterministic::Executor::default();
983        test_metrics(executor, context);
984    }
985
986    #[test]
987    #[should_panic]
988    fn test_deterministic_metrics_label() {
989        let (executor, context, _) = deterministic::Executor::default();
990        test_metrics_label(executor, context);
991    }
992
993    #[test]
994    fn test_tokio_error_future() {
995        let (runner, _) = tokio::Executor::default();
996        test_error_future(runner);
997    }
998
999    #[test]
1000    fn test_tokio_clock_sleep() {
1001        let (executor, context) = tokio::Executor::default();
1002        test_clock_sleep(executor, context);
1003    }
1004
1005    #[test]
1006    fn test_tokio_clock_sleep_until() {
1007        let (executor, context) = tokio::Executor::default();
1008        test_clock_sleep_until(executor, context);
1009    }
1010
1011    #[test]
1012    fn test_tokio_root_finishes() {
1013        let (executor, context) = tokio::Executor::default();
1014        test_root_finishes(executor, context);
1015    }
1016
1017    #[test]
1018    fn test_tokio_spawn_abort() {
1019        let (executor, context) = tokio::Executor::default();
1020        test_spawn_abort(executor, context);
1021    }
1022
1023    #[test]
1024    fn test_tokio_panic_aborts_root() {
1025        let (runner, _) = tokio::Executor::default();
1026        test_panic_aborts_root(runner);
1027    }
1028
1029    #[test]
1030    fn test_tokio_panic_aborts_spawn() {
1031        let (executor, context) = tokio::Executor::default();
1032        test_panic_aborts_spawn(executor, context);
1033    }
1034
1035    #[test]
1036    fn test_tokio_select() {
1037        let (executor, _) = tokio::Executor::default();
1038        test_select(executor);
1039    }
1040
1041    #[test]
1042    fn test_tokio_select_loop() {
1043        let (executor, context) = tokio::Executor::default();
1044        test_select_loop(executor, context);
1045    }
1046
1047    #[test]
1048    fn test_tokio_storage_operations() {
1049        let (executor, context) = tokio::Executor::default();
1050        test_storage_operations(executor, context);
1051    }
1052
1053    #[test]
1054    fn test_tokio_blob_read_write() {
1055        let (executor, context) = tokio::Executor::default();
1056        test_blob_read_write(executor, context);
1057    }
1058
1059    #[test]
1060    fn test_tokio_many_partition_read_write() {
1061        let (executor, context) = tokio::Executor::default();
1062        test_many_partition_read_write(executor, context);
1063    }
1064
1065    #[test]
1066    fn test_tokio_blob_read_past_length() {
1067        let (executor, context) = tokio::Executor::default();
1068        test_blob_read_past_length(executor, context);
1069    }
1070
1071    #[test]
1072    fn test_tokio_blob_clone_and_concurrent_read() {
1073        let (executor, context) = tokio::Executor::default();
1075        test_blob_clone_and_concurrent_read(executor, context.clone());
1076
1077        let buffer = context.encode();
1079        assert!(buffer.contains("open_blobs 0"));
1080    }
1081
1082    #[test]
1083    fn test_tokio_shutdown() {
1084        let (executor, context) = tokio::Executor::default();
1085        test_shutdown(executor, context);
1086    }
1087
1088    #[test]
1089    fn test_tokio_spawn_ref() {
1090        let (executor, context) = tokio::Executor::default();
1091        test_spawn_ref(executor, context);
1092    }
1093
1094    #[test]
1095    #[should_panic]
1096    fn test_tokio_spawn_ref_duplicate() {
1097        let (executor, context) = tokio::Executor::default();
1098        test_spawn_ref_duplicate(executor, context);
1099    }
1100
1101    #[test]
1102    #[should_panic]
1103    fn test_tokio_spawn_duplicate() {
1104        let (executor, context) = tokio::Executor::default();
1105        test_spawn_duplicate(executor, context);
1106    }
1107
1108    #[test]
1109    fn test_tokio_metrics() {
1110        let (executor, context) = tokio::Executor::default();
1111        test_metrics(executor, context);
1112    }
1113
1114    #[test]
1115    #[should_panic]
1116    fn test_tokio_metrics_label() {
1117        let (executor, context) = tokio::Executor::default();
1118        test_metrics_label(executor, context);
1119    }
1120}