async_rs/
runtime.rs

1use crate::{
2    AsyncIOHandle, AsyncToSocketAddrs, Executor, IOHandle, Reactor, RuntimeKit, Task, sys::IO,
3};
4use async_trait::async_trait;
5use futures_core::Stream;
6use std::{
7    fmt,
8    future::Future,
9    io,
10    net::{SocketAddr, ToSocketAddrs},
11    time::{Duration, Instant},
12};
13
14/// A full-featured Runtime implementation
15#[derive(Debug)]
16pub struct Runtime<RK: RuntimeKit + 'static> {
17    kit: RK,
18}
19
20impl<RK: RuntimeKit + 'static> Runtime<RK> {
21    /// Create a new Runtime from a RuntimeKit
22    pub fn new(kit: RK) -> Self {
23        Self { kit }
24    }
25
26    /// Asynchronously resolve the given domain name
27    pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
28        &self,
29        addrs: A,
30    ) -> impl AsyncToSocketAddrs
31    where
32        <A as std::net::ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
33    {
34        SocketAddrsResolver {
35            runtime: self,
36            addrs,
37        }
38    }
39}
40
41impl<RK: RuntimeKit + 'static> From<RK> for Runtime<RK> {
42    fn from(kit: RK) -> Self {
43        Self::new(kit)
44    }
45}
46
47impl<RK: RuntimeKit + 'static> Executor for Runtime<RK> {
48    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
49        self.kit.block_on(f)
50    }
51
52    fn spawn<T: Send + 'static>(
53        &self,
54        f: impl Future<Output = T> + Send + 'static,
55    ) -> impl Task<T> {
56        self.kit.spawn(f)
57    }
58
59    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
60        &self,
61        f: F,
62    ) -> impl Task<T> {
63        self.kit.spawn_blocking(f)
64    }
65}
66
67#[async_trait]
68impl<RK: RuntimeKit + Sync + 'static> Reactor for Runtime<RK> {
69    fn register<H: IO + Send + 'static>(
70        &self,
71        socket: IOHandle<H>,
72    ) -> io::Result<impl AsyncIOHandle + Send> {
73        self.kit.register(socket)
74    }
75
76    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
77        self.kit.sleep(dur)
78    }
79
80    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
81        self.kit.interval(dur)
82    }
83
84    async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
85        self.kit.tcp_connect(addr).await
86    }
87}
88
89/// Wrapper around separate Executor and Reactor implementing RuntimeKit
90#[derive(Debug)]
91pub struct RuntimeParts<E: Executor, R: Reactor> {
92    executor: E,
93    reactor: R,
94}
95
96impl<E: Executor, R: Reactor> RuntimeParts<E, R> {
97    /// Create new RuntimeParts from separate Executor and Reactor
98    pub fn new(executor: E, reactor: R) -> Self {
99        Self { executor, reactor }
100    }
101}
102
103impl<E: Executor + Sync + fmt::Debug, R: Reactor + Sync + fmt::Debug> RuntimeKit
104    for RuntimeParts<E, R>
105{
106}
107
108impl<E: Executor, R: Reactor> Executor for RuntimeParts<E, R> {
109    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
110        self.executor.block_on(f)
111    }
112
113    fn spawn<T: Send + 'static>(
114        &self,
115        f: impl Future<Output = T> + Send + 'static,
116    ) -> impl Task<T> {
117        self.executor.spawn(f)
118    }
119
120    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
121        &self,
122        f: F,
123    ) -> impl Task<T> {
124        self.executor.spawn_blocking(f)
125    }
126}
127
128#[async_trait]
129impl<E: Executor + Sync, R: Reactor + Sync> Reactor for RuntimeParts<E, R> {
130    fn register<H: IO + Send + 'static>(
131        &self,
132        socket: IOHandle<H>,
133    ) -> io::Result<impl AsyncIOHandle + Send> {
134        self.reactor.register(socket)
135    }
136
137    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
138        self.reactor.sleep(dur)
139    }
140
141    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
142        self.reactor.interval(dur)
143    }
144
145    async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
146        self.reactor.tcp_connect(addr).await
147    }
148}
149
150struct SocketAddrsResolver<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> {
151    runtime: &'a Runtime<RK>,
152    addrs: A,
153}
154
155impl<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> AsyncToSocketAddrs
156    for SocketAddrsResolver<'a, RK, A>
157where
158    <A as ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
159{
160    fn to_socket_addrs(
161        self,
162    ) -> impl Future<Output = io::Result<impl Iterator<Item = SocketAddr> + Send>> + Send {
163        let SocketAddrsResolver { runtime, addrs } = self;
164        runtime.spawn_blocking(move || addrs.to_socket_addrs())
165    }
166}