1use crate::{
2 AsyncIOHandle, AsyncToSocketAddrs, Executor, IOHandle, Reactor, RuntimeKit, Task, sys::IO,
3};
4use async_trait::async_trait;
5use futures_core::Stream;
6use std::{
7 fmt,
8 future::Future,
9 io,
10 net::{SocketAddr, ToSocketAddrs},
11 time::{Duration, Instant},
12};
13
14#[derive(Debug)]
16pub struct Runtime<RK: RuntimeKit + 'static> {
17 kit: RK,
18}
19
20impl<RK: RuntimeKit + 'static> Runtime<RK> {
21 pub fn new(kit: RK) -> Self {
23 Self { kit }
24 }
25
26 pub fn to_socket_addrs<A: ToSocketAddrs + Send + 'static>(
28 &self,
29 addrs: A,
30 ) -> impl AsyncToSocketAddrs
31 where
32 <A as std::net::ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
33 {
34 SocketAddrsResolver {
35 runtime: self,
36 addrs,
37 }
38 }
39}
40
41impl<RK: RuntimeKit + 'static> From<RK> for Runtime<RK> {
42 fn from(kit: RK) -> Self {
43 Self::new(kit)
44 }
45}
46
47impl<RK: RuntimeKit + 'static> Executor for Runtime<RK> {
48 fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
49 self.kit.block_on(f)
50 }
51
52 fn spawn<T: Send + 'static>(
53 &self,
54 f: impl Future<Output = T> + Send + 'static,
55 ) -> impl Task<T> {
56 self.kit.spawn(f)
57 }
58
59 fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
60 &self,
61 f: F,
62 ) -> impl Task<T> {
63 self.kit.spawn_blocking(f)
64 }
65}
66
67#[async_trait]
68impl<RK: RuntimeKit + Sync + 'static> Reactor for Runtime<RK> {
69 fn register<H: IO + Send + 'static>(
70 &self,
71 socket: IOHandle<H>,
72 ) -> io::Result<impl AsyncIOHandle + Send> {
73 self.kit.register(socket)
74 }
75
76 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
77 self.kit.sleep(dur)
78 }
79
80 fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
81 self.kit.interval(dur)
82 }
83
84 async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
85 self.kit.tcp_connect(addr).await
86 }
87}
88
89#[derive(Debug)]
91pub struct RuntimeParts<E: Executor, R: Reactor> {
92 executor: E,
93 reactor: R,
94}
95
96impl<E: Executor, R: Reactor> RuntimeParts<E, R> {
97 pub fn new(executor: E, reactor: R) -> Self {
99 Self { executor, reactor }
100 }
101}
102
103impl<E: Executor + Sync + fmt::Debug, R: Reactor + Sync + fmt::Debug> RuntimeKit
104 for RuntimeParts<E, R>
105{
106}
107
108impl<E: Executor, R: Reactor> Executor for RuntimeParts<E, R> {
109 fn block_on<T, F: Future<Output = T>>(&self, f: F) -> T {
110 self.executor.block_on(f)
111 }
112
113 fn spawn<T: Send + 'static>(
114 &self,
115 f: impl Future<Output = T> + Send + 'static,
116 ) -> impl Task<T> {
117 self.executor.spawn(f)
118 }
119
120 fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(
121 &self,
122 f: F,
123 ) -> impl Task<T> {
124 self.executor.spawn_blocking(f)
125 }
126}
127
128#[async_trait]
129impl<E: Executor + Sync, R: Reactor + Sync> Reactor for RuntimeParts<E, R> {
130 fn register<H: IO + Send + 'static>(
131 &self,
132 socket: IOHandle<H>,
133 ) -> io::Result<impl AsyncIOHandle + Send> {
134 self.reactor.register(socket)
135 }
136
137 fn sleep(&self, dur: Duration) -> impl Future<Output = ()> {
138 self.reactor.sleep(dur)
139 }
140
141 fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> {
142 self.reactor.interval(dur)
143 }
144
145 async fn tcp_connect(&self, addr: SocketAddr) -> io::Result<impl AsyncIOHandle + Send> {
146 self.reactor.tcp_connect(addr).await
147 }
148}
149
150struct SocketAddrsResolver<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> {
151 runtime: &'a Runtime<RK>,
152 addrs: A,
153}
154
155impl<'a, RK: RuntimeKit + 'static, A: ToSocketAddrs + Send + 'static> AsyncToSocketAddrs
156 for SocketAddrsResolver<'a, RK, A>
157where
158 <A as ToSocketAddrs>::Iter: Iterator<Item = SocketAddr> + Send + 'static,
159{
160 fn to_socket_addrs(
161 self,
162 ) -> impl Future<Output = io::Result<impl Iterator<Item = SocketAddr> + Send>> + Send {
163 let SocketAddrsResolver { runtime, addrs } = self;
164 runtime.spawn_blocking(move || addrs.to_socket_addrs())
165 }
166}