async_rs/
runtime.rs

1use crate::{
2    sys::IO,
3    traits::{AsyncIOHandle, AsyncToSocketAddrs, Executor, Reactor, RuntimeKit, Task},
4    util::IOHandle,
5};
6use async_trait::async_trait;
7use futures_core::Stream;
8use std::{
9    future::Future,
10    io,
11    net::{SocketAddr, ToSocketAddrs},
12    time::{Duration, Instant},
13};
14
15/// A full-featured Runtime implementation
16#[derive(Debug)]
17pub struct Runtime<RK: RuntimeKit + 'static> {
18    kit: RK,
19}
20
21impl<RK: RuntimeKit + 'static> Runtime<RK> {
22    /// Create a new Runtime from a RuntimeKit
23    pub fn new(kit: RK) -> Self {
24        Self { kit }
25    }
26
27    /// Asynchronously resolve the given domain name
28    pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
29        &self,
30        addrs: A,
31    ) -> impl AsyncToSocketAddrs
32    where
33        <A as std::net::ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
34    {
35        SocketAddrsResolver {
36            runtime: self,
37            addrs,
38        }
39    }
40}
41
42impl<RK: RuntimeKit + 'static> From<RK> for Runtime<RK> {
43    fn from(kit: RK) -> Self {
44        Self::new(kit)
45    }
46}
47
48impl<RK: RuntimeKit + 'static> Executor for Runtime<RK> {
49    fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
50        self.kit.block_on(f)
51    }
52
53    fn spawn<T: Send + 'static>(
54        &self,
55        f: impl Future<Output = T> + Send + 'static,
56    ) -> impl Task<T> {
57        self.kit.spawn(f)
58    }
59
60    fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
61        &self,
62        f: F,
63    ) -> impl Task<T> {
64        self.kit.spawn_blocking(f)
65    }
66}
67
68#[async_trait]
69impl<RK: RuntimeKit + Sync + 'static> Reactor for Runtime<RK> {
70    fn register<H: IO + Send + 'static>(
71        &self,
72        socket: IOHandle<H>,
73    ) -> io::Result<impl AsyncIOHandle + Send> {
74        self.kit.register(socket)
75    }
76
77    fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
78        self.kit.sleep(dur)
79    }
80
81    fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
82        self.kit.interval(dur)
83    }
84
85    async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
86        self.kit.tcp_connect(addr).await
87    }
88}
89
90struct SocketAddrsResolver<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> {
91    runtime: &'a Runtime<RK>,
92    addrs: A,
93}
94
95impl<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> AsyncToSocketAddrs
96    for SocketAddrsResolver<'a, RK, A>
97where
98    <A as ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
99{
100    fn to_socket_addrs(
101        self,
102    ) -> impl Future<Output = io::Result<impl Iterator<Item = SocketAddr> + Send>> + Send {
103        let SocketAddrsResolver { runtime, addrs } = self;
104        runtime.spawn_blocking(move || addrs.to_socket_addrs())
105    }
106}