Skip to main content

async_rs/
runtime.rs

1use crate::{
2    sys::AsSysFd,
3    traits::{Executor, Reactor, RuntimeKit},
4    util::{SocketAddrsResolver, Task},
5};
6use futures_core::Stream;
7use futures_io::{AsyncRead, AsyncWrite};
8use std::{
9    future::Future,
10    io::{self, Read, Write},
11    net::{SocketAddr, ToSocketAddrs},
12    time::{Duration, Instant},
13};
14
15/// A full-featured async runtime that combines an executor and a reactor.
16///
17/// `Runtime<RK>` wraps any [`RuntimeKit`] and adds higher-level helpers such as
18/// domain-name resolution ([`Runtime::to_socket_addrs`]) and runtime-shutdown
19/// error detection ([`Runtime::is_runtime_shutdown_error`]).
20///
21/// Concrete type aliases — [`NoopRuntime`], [`SmolRuntime`], [`TokioRuntime`] — are
22/// provided for each built-in backend and are the usual entry points.
23#[derive(Clone, Debug)]
24pub struct Runtime<RK: RuntimeKit> {
25    kit: RK,
26}
27
28impl<RK: RuntimeKit> Runtime<RK> {
29    /// Create a new Runtime from a RuntimeKit
30    pub fn new(kit: RK) -> Self {
31        Self { kit }
32    }
33
34    /// Asynchronously resolve the given domain name
35    pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
36        &self,
37        addrs: A,
38    ) -> SocketAddrsResolver<'_, RK, A>
39    where
40        <A as ToSocketAddrs>::Iter: Send + 'static,
41    {
42        SocketAddrsResolver {
43            runtime: self,
44            addrs,
45        }
46    }
47
48    /// Check if an `std::io::Error` is a runtime shutdown error
49    pub fn is_runtime_shutdown_error(&self, err: &io::Error) -> bool {
50        #[cfg(feature = "tokio")]
51        if tokio::runtime::is_rt_shutdown_err(err) {
52            return true;
53        }
54        #[cfg(not(feature = "tokio"))]
55        let _ = err;
56        false
57    }
58}
59
60impl<RK: RuntimeKit> From<RK> for Runtime<RK> {
61    fn from(kit: RK) -> Self {
62        Self::new(kit)
63    }
64}
65
66impl<RK: RuntimeKit> Executor for Runtime<RK> {
67    type Task<T: Send + 'static> = <RK as Executor>::Task<T>;
68
69    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
70        self.kit.block_on(f)
71    }
72
73    fn spawn<T: Send + 'static, F: Future<Output = T> + Send + 'static>(
74        &self,
75        f: F,
76    ) -> Task<Self::Task<T>> {
77        self.kit.spawn(f)
78    }
79
80    fn spawn_blocking<T: Send + 'static, F: FnOnce() -> T + Send + 'static>(
81        &self,
82        f: F,
83    ) -> Task<Self::Task<T>> {
84        self.kit.spawn_blocking(f)
85    }
86}
87
88impl<RK: RuntimeKit> Reactor for Runtime<RK> {
89    type TcpStream = <RK as Reactor>::TcpStream;
90    type Sleep = <RK as Reactor>::Sleep;
91
92    fn register<H: Read + Write + AsSysFd + Send + 'static>(
93        &self,
94        socket: H,
95    ) -> io::Result<impl AsyncRead + AsyncWrite + Send + Unpin + 'static> {
96        self.kit.register(socket)
97    }
98
99    fn sleep(&self, dur: Duration) -> Self::Sleep {
100        self.kit.sleep(dur)
101    }
102
103    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {
104        self.kit.interval(dur)
105    }
106
107    fn tcp_connect_addr(
108        &self,
109        addr: SocketAddr,
110    ) -> impl Future<Output = io::Result<Self::TcpStream>> + Send + 'static {
111        self.kit.tcp_connect_addr(addr)
112    }
113}