Skip to main content

async_rs/
runtime.rs

1use crate::{
2    sys::AsSysFd,
3    traits::{Executor, Reactor, RuntimeKit},
4    util::{SocketAddrsResolver, Task},
5};
6use futures_core::Stream;
7use futures_io::{AsyncRead, AsyncWrite};
8use std::{
9    future::Future,
10    io::{self, Read, Write},
11    net::{SocketAddr, ToSocketAddrs},
12    time::{Duration, Instant},
13};
14
15/// A full-featured Runtime implementation
16#[derive(Clone, Debug)]
17pub struct Runtime<RK: RuntimeKit> {
18    kit: RK,
19}
20
21impl<RK: RuntimeKit> Runtime<RK> {
22    /// Create a new Runtime from a RuntimeKit
23    pub fn new(kit: RK) -> Self {
24        Self { kit }
25    }
26
27    /// Asynchronously resolve the given domain name
28    pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
29        &self,
30        addrs: A,
31    ) -> SocketAddrsResolver<'_, RK, A>
32    where
33        <A as std::net::ToSocketAddrs>::Iter: Send + 'static,
34    {
35        SocketAddrsResolver {
36            runtime: self,
37            addrs,
38        }
39    }
40
41    /// Check if an `std::io::Error` is a runtime shutdown error
42    // FIXME: move this to Reactor trait for next semver breaking release
43    pub fn is_runtime_shutdown_error(&self, err: &io::Error) -> bool {
44        #[cfg(feature = "tokio")]
45        if tokio::runtime::is_rt_shutdown_err(err) {
46            return true;
47        }
48        let _ = err;
49        false
50    }
51}
52
53impl<RK: RuntimeKit> From<RK> for Runtime<RK> {
54    fn from(kit: RK) -> Self {
55        Self::new(kit)
56    }
57}
58
59impl<RK: RuntimeKit> Executor for Runtime<RK> {
60    type Task<T: Send + 'static> = <RK as Executor>::Task<T>;
61
62    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
63        self.kit.block_on(f)
64    }
65
66    fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
67        &self,
68        f: F,
69    ) -> Task<Self::Task<T>> {
70        self.kit.spawn(f)
71    }
72
73    fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
74        &self,
75        f: F,
76    ) -> Task<Self::Task<T>> {
77        self.kit.spawn_blocking(f)
78    }
79}
80
81impl<RK: RuntimeKit> Reactor for Runtime<RK> {
82    type TcpStream = <RK as Reactor>::TcpStream;
83    type Sleep = <RK as Reactor>::Sleep;
84
85    fn register<H: Read + Write + AsSysFd + Send + 'static>(
86        &self,
87        socket: H,
88    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
89        self.kit.register(socket)
90    }
91
92    fn sleep(&self, dur: Duration) -> Self::Sleep {
93        self.kit.sleep(dur)
94    }
95
96    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
97        self.kit.interval(dur)
98    }
99
100    fn tcp_connect_addr(
101        &self,
102        addr: SocketAddr,
103    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
104        self.kit.tcp_connect_addr(addr)
105    }
106}