1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 use tok_io::runtime::Handle;
56 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58 pub fn block_on<F: Future<Output = ()>>(fut: F) {
61 if let Ok(hnd) = Handle::try_current() {
62 log::debug!("Use existing tokio runtime and block on future");
63 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64 } else {
65 log::debug!("Create tokio runtime and block on future");
66
67 let rt = tok_io::runtime::Builder::new_current_thread()
68 .enable_all()
69 .build()
71 .unwrap();
72 tok_io::task::LocalSet::new().block_on(&rt, fut);
73 }
74 }
75
76 #[inline]
84 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85 where
86 F: Future + 'static,
87 {
88 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89 tok_io::task::spawn_local(async move {
90 if let Some(ptr) = ptr {
91 tok_io::pin!(f);
92 let result = poll_fn(|ctx| {
93 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94 let result = f.as_mut().poll(ctx);
95 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96 result
97 })
98 .await;
99 crate::CB.with(|cb| (cb.borrow().3)(ptr));
100 result
101 } else {
102 f.await
103 }
104 })
105 }
106
107 #[inline]
115 #[doc(hidden)]
116 #[deprecated]
117 pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
118 where
119 F: FnOnce() -> R + 'static,
120 R: Future + 'static,
121 {
122 spawn(async move { f().await })
123 }
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "compio")]
128mod compio {
129 use std::task::{ready, Context, Poll};
130 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
131
132 use compio_runtime::Runtime;
133
134 pub fn block_on<F: Future<Output = ()>>(fut: F) {
137 log::info!(
138 "Starting compio runtime, driver {:?}",
139 compio_driver::DriverType::current()
140 );
141 let rt = Runtime::new().unwrap();
142 rt.block_on(fut);
143 }
144
145 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
151 where
152 F: FnOnce() -> T + Send + Sync + 'static,
153 T: Send + 'static,
154 {
155 JoinHandle {
156 fut: Some(compio_runtime::spawn_blocking(f)),
157 }
158 }
159
160 #[inline]
168 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
169 where
170 F: Future + 'static,
171 {
172 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
173 let fut = compio_runtime::spawn(async move {
174 if let Some(ptr) = ptr {
175 let mut f = std::pin::pin!(f);
176 let result = poll_fn(|ctx| {
177 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
178 let result = f.as_mut().poll(ctx);
179 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
180 result
181 })
182 .await;
183 crate::CB.with(|cb| (cb.borrow().3)(ptr));
184 result
185 } else {
186 f.await
187 }
188 });
189
190 JoinHandle { fut: Some(fut) }
191 }
192
193 #[inline]
201 #[doc(hidden)]
202 #[deprecated]
203 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
204 where
205 F: FnOnce() -> R + 'static,
206 R: Future + 'static,
207 {
208 spawn(async move { f().await })
209 }
210
211 #[derive(Debug, Copy, Clone)]
212 pub struct JoinError;
213
214 impl fmt::Display for JoinError {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 write!(f, "JoinError")
217 }
218 }
219
220 impl std::error::Error for JoinError {}
221
222 pub struct JoinHandle<T> {
223 fut: Option<compio_runtime::JoinHandle<T>>,
224 }
225
226 impl<T> JoinHandle<T> {
227 pub fn is_finished(&self) -> bool {
228 if let Some(hnd) = &self.fut {
229 hnd.is_finished()
230 } else {
231 true
232 }
233 }
234 }
235
236 impl<T> Drop for JoinHandle<T> {
237 fn drop(&mut self) {
238 self.fut.take().unwrap().detach();
239 }
240 }
241
242 impl<T> Future for JoinHandle<T> {
243 type Output = Result<T, JoinError>;
244
245 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246 Poll::Ready(
247 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
248 .map_err(|_| JoinError),
249 )
250 }
251 }
252}
253
254#[allow(dead_code)]
255#[cfg(feature = "neon")]
256mod neon {
257 use std::task::{ready, Context, Poll};
258 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
259
260 use ntex_neon::Runtime;
261
262 pub fn block_on<F: Future<Output = ()>>(fut: F) {
265 let rt = Runtime::new().unwrap();
266 log::info!(
267 "Starting neon runtime, driver {:?}",
268 rt.driver_type().name()
269 );
270
271 rt.block_on(fut);
272 }
273
274 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
280 where
281 F: FnOnce() -> T + Send + Sync + 'static,
282 T: Send + 'static,
283 {
284 JoinHandle {
285 fut: Some(ntex_neon::spawn_blocking(f)),
286 }
287 }
288
289 #[inline]
297 pub fn spawn<F>(f: F) -> Task<F::Output>
298 where
299 F: Future + 'static,
300 {
301 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
302 let task = ntex_neon::spawn(async move {
303 if let Some(ptr) = ptr {
304 let mut f = std::pin::pin!(f);
305 let result = poll_fn(|ctx| {
306 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
307 let result = f.as_mut().poll(ctx);
308 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
309 result
310 })
311 .await;
312 crate::CB.with(|cb| (cb.borrow().3)(ptr));
313 result
314 } else {
315 f.await
316 }
317 });
318
319 Task { task: Some(task) }
320 }
321
322 #[inline]
330 #[doc(hidden)]
331 #[deprecated]
332 pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
333 where
334 F: FnOnce() -> R + 'static,
335 R: Future + 'static,
336 {
337 spawn(async move { f().await })
338 }
339
340 pub struct Task<T> {
342 task: Option<ntex_neon::Task<T>>,
343 }
344
345 impl<T> Task<T> {
346 pub fn is_finished(&self) -> bool {
347 if let Some(hnd) = &self.task {
348 hnd.is_finished()
349 } else {
350 true
351 }
352 }
353 }
354
355 impl<T> Drop for Task<T> {
356 fn drop(&mut self) {
357 self.task.take().unwrap().detach();
358 }
359 }
360
361 impl<T> Future for Task<T> {
362 type Output = Result<T, JoinError>;
363
364 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365 Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
366 }
367 }
368
369 #[derive(Debug, Copy, Clone)]
370 pub struct JoinError;
371
372 impl fmt::Display for JoinError {
373 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374 write!(f, "JoinError")
375 }
376 }
377
378 impl std::error::Error for JoinError {}
379
380 pub struct JoinHandle<T> {
381 fut: Option<ntex_neon::JoinHandle<T>>,
382 }
383
384 impl<T> JoinHandle<T> {
385 pub fn is_finished(&self) -> bool {
386 self.fut.is_none()
387 }
388 }
389
390 impl<T> Future for JoinHandle<T> {
391 type Output = Result<T, JoinError>;
392
393 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
394 Poll::Ready(
395 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
396 .map_err(|_| JoinError)
397 .and_then(|result| result.map_err(|_| JoinError)),
398 )
399 }
400 }
401}
402
403#[cfg(feature = "tokio")]
404pub use self::tokio::*;
405
406#[cfg(feature = "compio")]
407pub use self::compio::*;
408
409#[cfg(feature = "neon")]
410pub use self::neon::*;
411
412#[allow(dead_code)]
413#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
414mod no_rt {
415 use std::task::{Context, Poll};
416 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
417
418 pub fn block_on<F: Future<Output = ()>>(_: F) {
421 panic!("async runtime is not configured");
422 }
423
424 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
425 where
426 F: Future + 'static,
427 {
428 unimplemented!()
429 }
430
431 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
432 where
433 F: FnOnce() -> T + Send + Sync + 'static,
434 T: Send + 'static,
435 {
436 unimplemented!()
437 }
438
439 #[allow(clippy::type_complexity)]
442 pub struct JoinHandle<T> {
443 t: PhantomData<T>,
444 }
445
446 impl<T> JoinHandle<T> {
447 pub fn is_finished(&self) -> bool {
448 true
449 }
450 }
451
452 impl<T> Future for JoinHandle<T> {
453 type Output = Result<T, JoinError>;
454
455 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
456 todo!()
457 }
458 }
459
460 #[derive(Debug, Copy, Clone)]
461 pub struct JoinError;
462
463 impl fmt::Display for JoinError {
464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465 write!(f, "JoinError")
466 }
467 }
468
469 impl std::error::Error for JoinError {}
470}
471
472#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
473pub use self::no_rt::*;