async_rs/
runtime.rs

1use crate::{
2    sys::IO,
3    traits::{AsyncIOHandle, AsyncToSocketAddrs, Executor, Reactor, RuntimeKit, Task},
4    util::IOHandle,
5};
6use futures_core::Stream;
7use std::{
8    future::Future,
9    io,
10    net::{SocketAddr, ToSocketAddrs},
11    time::{Duration, Instant},
12};
13
14/// A full-featured Runtime implementation
15#[derive(Debug)]
16pub struct Runtime<RK: RuntimeKit + 'static> {
17    kit: RK,
18}
19
20impl<RK: RuntimeKit + 'static> Runtime<RK> {
21    /// Create a new Runtime from a RuntimeKit
22    pub fn new(kit: RK) -> Self {
23        Self { kit }
24    }
25
26    /// Asynchronously resolve the given domain name
27    pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
28        &self,
29        addrs: A,
30    ) -> impl AsyncToSocketAddrs
31    where
32        <A as std::net::ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
33    {
34        SocketAddrsResolver {
35            runtime: self,
36            addrs,
37        }
38    }
39}
40
41impl<RK: RuntimeKit + 'static> From<RK> for Runtime<RK> {
42    fn from(kit: RK) -> Self {
43        Self::new(kit)
44    }
45}
46
47impl<RK: RuntimeKit + 'static> Executor for Runtime<RK> {
48    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
49        self.kit.block_on(f)
50    }
51
52    fn spawn<T: Send + 'static>(
53        &self,
54        f: impl Future<Output = T> + Send + 'static,
55    ) -> impl Task<T> {
56        self.kit.spawn(f)
57    }
58
59    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
60        &self,
61        f: F,
62    ) -> impl Task<T> {
63        self.kit.spawn_blocking(f)
64    }
65}
66
67impl<RK: RuntimeKit + Sync + 'static> Reactor for Runtime<RK> {
68    fn register<H: IO + Send + 'static>(
69        &self,
70        socket: IOHandle<H>,
71    ) -> io::Result<impl AsyncIOHandle + Send> {
72        self.kit.register(socket)
73    }
74
75    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
76        self.kit.sleep(dur)
77    }
78
79    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
80        self.kit.interval(dur)
81    }
82
83    fn tcp_connect(
84        &self,
85        addr: SocketAddr,
86    ) -> impl Future<Output = io::Result<impl AsyncIOHandle + Send>> + Send {
87        self.kit.tcp_connect(addr)
88    }
89}
90
91struct SocketAddrsResolver<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> {
92    runtime: &'a Runtime<RK>,
93    addrs: A,
94}
95
96impl<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> AsyncToSocketAddrs
97    for SocketAddrsResolver<'a, RK, A>
98where
99    <A as ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
100{
101    fn to_socket_addrs(
102        self,
103    ) -> impl Future<Output = io::Result<impl Iterator<Item = SocketAddr> + Send>> + Send {
104        let SocketAddrsResolver { runtime, addrs } = self;
105        runtime.spawn_blocking(move || addrs.to_socket_addrs())
106    }
107}