1use crate::{
2 sys::AsSysFd,
3 traits::{Executor, Reactor, RuntimeKit},
4 util::{SocketAddrsResolver, Task},
5};
6use futures_core::Stream;
7use futures_io::{AsyncRead, AsyncWrite};
8use std::{
9 future::Future,
10 io::{self, Read, Write},
11 net::{SocketAddr, ToSocketAddrs},
12 time::{Duration, Instant},
13};
14
15#[derive(Clone, Debug)]
17pub struct Runtime<RK: RuntimeKit> {
18 kit: RK,
19}
20
21impl<RK: RuntimeKit> Runtime<RK> {
22 pub fn new(kit: RK) -> Self {
24 Self { kit }
25 }
26
27 pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
29 &self,
30 addrs: A,
31 ) -> SocketAddrsResolver<'_, RK, A>
32 where
33 <A as std::net::ToSocketAddrs>::Iter: Send + 'static,
34 {
35 SocketAddrsResolver {
36 runtime: self,
37 addrs,
38 }
39 }
40
41 pub fn is_runtime_shutdown_error(&self, err: &io::Error) -> bool {
43 #[cfg(feature = "tokio")]
44 if tokio::runtime::is_rt_shutdown_err(err) {
45 return true;
46 }
47 let _ = err;
48 false
49 }
50}
51
52impl<RK: RuntimeKit> From<RK> for Runtime<RK> {
53 fn from(kit: RK) -> Self {
54 Self::new(kit)
55 }
56}
57
58impl<RK: RuntimeKit> Executor for Runtime<RK> {
59 type Task<T: Send + 'static> = <RK as Executor>::Task<T>;
60
61 fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
62 self.kit.block_on(f)
63 }
64
65 fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
66 &self,
67 f: F,
68 ) -> Task<Self::Task<T>> {
69 self.kit.spawn(f)
70 }
71
72 fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
73 &self,
74 f: F,
75 ) -> Task<Self::Task<T>> {
76 self.kit.spawn_blocking(f)
77 }
78}
79
80impl<RK: RuntimeKit> Reactor for Runtime<RK> {
81 type TcpStream = <RK as Reactor>::TcpStream;
82 type Sleep = <RK as Reactor>::Sleep;
83
84 fn register<H: Read + Write + AsSysFd + Send + 'static>(
85 &self,
86 socket: H,
87 ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
88 self.kit.register(socket)
89 }
90
91 fn sleep(&self, dur: Duration) -> Self::Sleep {
92 self.kit.sleep(dur)
93 }
94
95 fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
96 self.kit.interval(dur)
97 }
98
99 fn tcp_connect_addr(
100 &self,
101 addr: SocketAddr,
102 ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
103 self.kit.tcp_connect_addr(addr)
104 }
105}