1use crate::{
2 sys::AsSysFd,
3 traits::{Executor, Reactor, RuntimeKit},
4 util::{SocketAddrsResolver, Task},
5};
6use futures_core::Stream;
7use futures_io::{AsyncRead, AsyncWrite};
8use std::{
9 future::Future,
10 io::{self, Read, Write},
11 net::{SocketAddr, ToSocketAddrs},
12 time::{Duration, Instant},
13};
14
15#[derive(Clone, Debug)]
24pub struct Runtime<RK: RuntimeKit> {
25 kit: RK,
26}
27
28impl<RK: RuntimeKit> Runtime<RK> {
29 pub fn new(kit: RK) -> Self {
31 Self { kit }
32 }
33
34 pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
36 &self,
37 addrs: A,
38 ) -> SocketAddrsResolver<'_, RK, A>
39 where
40 <A as ToSocketAddrs>::Iter: Send + 'static,
41 {
42 SocketAddrsResolver {
43 runtime: self,
44 addrs,
45 }
46 }
47
48 pub fn is_runtime_shutdown_error(&self, err: &io::Error) -> bool {
50 #[cfg(feature = "tokio")]
51 if tokio::runtime::is_rt_shutdown_err(err) {
52 return true;
53 }
54 #[cfg(not(feature = "tokio"))]
55 let _ = err;
56 false
57 }
58}
59
60impl<RK: RuntimeKit> From<RK> for Runtime<RK> {
61 fn from(kit: RK) -> Self {
62 Self::new(kit)
63 }
64}
65
66impl<RK: RuntimeKit> Executor for Runtime<RK> {
67 type Task<T: Send + 'static> = <RK as Executor>::Task<T>;
68
69 fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
70 self.kit.block_on(f)
71 }
72
73 fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
74 &self,
75 f: F,
76 ) -> Task<Self::Task<T>> {
77 self.kit.spawn(f)
78 }
79
80 fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
81 &self,
82 f: F,
83 ) -> Task<Self::Task<T>> {
84 self.kit.spawn_blocking(f)
85 }
86}
87
88impl<RK: RuntimeKit> Reactor for Runtime<RK> {
89 type TcpStream = <RK as Reactor>::TcpStream;
90 type Sleep = <RK as Reactor>::Sleep;
91
92 fn register<H: Read + Write + AsSysFd + Send + 'static>(
93 &self,
94 socket: H,
95 ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
96 self.kit.register(socket)
97 }
98
99 fn sleep(&self, dur: Duration) -> Self::Sleep {
100 self.kit.sleep(dur)
101 }
102
103 fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
104 self.kit.interval(dur)
105 }
106
107 fn tcp_connect_addr(
108 &self,
109 addr: SocketAddr,
110 ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
111 self.kit.tcp_connect_addr(addr)
112 }
113}