1use orb::io::{AsyncFd, AsyncIO};
19pub use orb::runtime::{AsyncExec, AsyncJoinHandle};
20use orb::time::{AsyncTime, TimeInterval};
21use std::fmt;
22use std::future::Future;
23use std::io;
24use std::net::SocketAddr;
25use std::net::TcpStream;
26use std::ops::Deref;
27use std::os::fd::{AsFd, AsRawFd};
28use std::os::unix::net::UnixStream;
29use std::path::PathBuf;
30use std::pin::Pin;
31use std::task::*;
32use std::time::{Duration, Instant};
33use tokio::runtime::{Builder, Handle, Runtime};
34
35pub enum TokioRT {
37 Runtime(Runtime),
38 Handle(Handle),
39}
40
41impl fmt::Debug for TokioRT {
42 #[inline]
43 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44 match self {
45 Self::Runtime(_) => write!(f, "tokio(rt)"),
46 Self::Handle(_) => write!(f, "tokio(handle)"),
47 }
48 }
49}
50
51impl TokioRT {
52 #[inline]
54 pub fn new_with_runtime(rt: Runtime) -> Self {
55 Self::Runtime(rt)
56 }
57
58 #[inline]
59 pub fn new_multi_thread(workers: usize) -> Self {
60 let mut builder = Builder::new_multi_thread();
61 if workers > 0 {
62 builder.worker_threads(workers);
63 }
64 Self::Runtime(builder.enable_all().build().unwrap())
65 }
66
67 #[inline]
68 pub fn new_current_thread() -> Self {
69 let mut builder = Builder::new_current_thread();
70 Self::Runtime(builder.enable_all().build().unwrap())
71 }
72
73 #[inline]
76 pub fn new_with_handle(handle: Handle) -> Self {
77 Self::Handle(handle)
78 }
79}
80
81impl orb::AsyncRuntime for TokioRT {}
82
83impl AsyncIO for TokioRT {
84 type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static> = TokioFD<T>;
85
86 #[inline(always)]
87 async fn connect_tcp(addr: &SocketAddr) -> io::Result<Self::AsyncFd<TcpStream>> {
88 let stream = tokio::net::TcpStream::connect(addr).await?;
89 Self::to_async_fd_rw(stream.into_std()?)
91 }
92
93 #[inline(always)]
94 async fn connect_unix(addr: &PathBuf) -> io::Result<Self::AsyncFd<UnixStream>> {
95 let stream = tokio::net::UnixStream::connect(addr).await?;
96 Self::to_async_fd_rw(stream.into_std()?)
98 }
99
100 #[inline(always)]
101 fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
102 fd: T,
103 ) -> io::Result<Self::AsyncFd<T>> {
104 use tokio::io;
105 Ok(TokioFD(io::unix::AsyncFd::with_interest(fd, io::Interest::READABLE)?))
106 }
107
108 #[inline(always)]
109 fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
110 fd: T,
111 ) -> io::Result<Self::AsyncFd<T>> {
112 use tokio::io;
113 use tokio::io::Interest;
114 Ok(TokioFD(io::unix::AsyncFd::with_interest(fd, Interest::READABLE | Interest::WRITABLE)?))
115 }
116}
117
118impl AsyncTime for TokioRT {
119 type Interval = TokioInterval;
120
121 #[inline(always)]
122 fn sleep(d: Duration) -> impl Future + Send {
123 tokio::time::sleep(d)
124 }
125
126 #[inline(always)]
127 fn tick(d: Duration) -> Self::Interval {
128 let later = tokio::time::Instant::now() + d;
129 TokioInterval(tokio::time::interval_at(later, d))
130 }
131}
132
133impl AsyncExec for TokioRT {
134 #[inline]
136 fn spawn<F, R>(&self, f: F) -> impl AsyncJoinHandle<R>
137 where
138 F: Future<Output = R> + Send + 'static,
139 R: Send + 'static,
140 {
141 match self {
142 Self::Runtime(s) => {
143 return TokioJoinHandle(s.spawn(f));
144 }
145 Self::Handle(s) => {
146 return TokioJoinHandle(s.spawn(f));
147 }
148 }
149 }
150
151 #[inline]
153 fn spawn_detach<F, R>(&self, f: F)
154 where
155 F: Future<Output = R> + Send + 'static,
156 R: Send + 'static,
157 {
158 match self {
159 Self::Runtime(s) => {
160 s.spawn(f);
161 }
162 Self::Handle(s) => {
163 s.spawn(f);
164 }
165 }
166 }
167
168 #[inline(always)]
169 fn spawn_blocking<F, R>(f: F) -> impl AsyncJoinHandle<R>
170 where
171 F: FnOnce() -> R + Send + 'static,
172 R: Send + 'static,
173 {
174 TokioJoinHandle(tokio::task::spawn_blocking(f))
175 }
176
177 #[inline]
179 fn block_on<F, R>(&self, f: F) -> R
180 where
181 F: Future<Output = R> + Send,
182 R: Send + 'static,
183 {
184 match self {
185 Self::Runtime(s) => {
186 return s.block_on(f);
187 }
188 Self::Handle(s) => {
189 return s.block_on(f);
190 }
191 }
192 }
193}
194
195pub struct TokioInterval(tokio::time::Interval);
197
198impl TimeInterval for TokioInterval {
199 #[inline]
200 fn poll_tick(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Instant> {
201 let _self = self.get_mut();
202 if let Poll::Ready(i) = _self.0.poll_tick(ctx) {
203 Poll::Ready(i.into_std())
204 } else {
205 Poll::Pending
206 }
207 }
208}
209
210pub struct TokioFD<T: AsRawFd + AsFd + Send + Sync + 'static>(tokio::io::unix::AsyncFd<T>);
212
213impl<T: AsRawFd + AsFd + Send + Sync + 'static> AsyncFd<T> for TokioFD<T> {
214 #[inline(always)]
215 async fn async_read<R>(&self, f: impl FnMut(&T) -> io::Result<R> + Send) -> io::Result<R> {
216 self.0.async_io(tokio::io::Interest::READABLE, f).await
217 }
218
219 #[inline(always)]
220 async fn async_write<R>(&self, f: impl FnMut(&T) -> io::Result<R> + Send) -> io::Result<R> {
221 self.0.async_io(tokio::io::Interest::WRITABLE, f).await
222 }
223}
224
225impl<T: AsRawFd + AsFd + Send + Sync + 'static> Deref for TokioFD<T> {
226 type Target = T;
227
228 #[inline(always)]
229 fn deref(&self) -> &Self::Target {
230 self.0.get_ref()
231 }
232}
233
234pub struct TokioJoinHandle<T>(tokio::task::JoinHandle<T>);
236
237impl<T: Send> AsyncJoinHandle<T> for TokioJoinHandle<T> {
238 #[inline]
239 async fn join(self) -> Result<T, ()> {
240 match self.0.await {
241 Ok(r) => Ok(r),
242 Err(_) => Err(()),
243 }
244 }
245
246 #[inline]
247 fn detach(self) {
248 }
251}