1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
56
57 pub fn block_on<F: Future<Output = ()>>(fut: F) {
60 if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
61 log::debug!("Use existing tokio runtime and block on future");
62 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
63 } else {
64 log::debug!("Create tokio runtime and block on future");
65
66 let rt = tok_io::runtime::Builder::new_current_thread()
67 .enable_all()
68 .build()
70 .unwrap();
71 tok_io::task::LocalSet::new().block_on(&rt, fut);
72 }
73 }
74
75 #[inline]
83 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
84 where
85 F: Future + 'static,
86 {
87 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
88 tok_io::task::spawn_local(async move {
89 if let Some(ptr) = ptr {
90 tok_io::pin!(f);
91 let result = poll_fn(|ctx| {
92 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
93 let result = f.as_mut().poll(ctx);
94 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
95 result
96 })
97 .await;
98 crate::CB.with(|cb| (cb.borrow().3)(ptr));
99 result
100 } else {
101 f.await
102 }
103 })
104 }
105
106 #[derive(Clone, Debug)]
107 pub struct Handle(tok_io::runtime::Handle);
109
110 impl Handle {
111 #[inline]
112 pub fn current() -> Self {
113 Self(tok_io::runtime::Handle::current())
114 }
115
116 #[inline]
117 pub fn notify(&self) {}
119
120 #[inline]
121 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
126 where
127 F: Future + Send + 'static,
128 F::Output: Send + 'static,
129 {
130 self.0.spawn(future)
131 }
132 }
133}
134
135#[allow(dead_code)]
136#[cfg(feature = "compio")]
137mod compio {
138 use std::task::{ready, Context, Poll};
139 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
140
141 use compio_runtime::Runtime;
142
143 pub fn block_on<F: Future<Output = ()>>(fut: F) {
146 log::info!(
147 "Starting compio runtime, driver {:?}",
148 compio_driver::DriverType::current()
149 );
150 let rt = Runtime::new().unwrap();
151 rt.block_on(fut);
152 }
153
154 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
160 where
161 F: FnOnce() -> T + Send + Sync + 'static,
162 T: Send + 'static,
163 {
164 JoinHandle {
165 fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
166 }
167 }
168
169 #[inline]
177 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
178 where
179 F: Future + 'static,
180 {
181 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
182 let fut = compio_runtime::spawn(async move {
183 if let Some(ptr) = ptr {
184 let mut f = std::pin::pin!(f);
185 let result = poll_fn(|ctx| {
186 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
187 let result = f.as_mut().poll(ctx);
188 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
189 result
190 })
191 .await;
192 crate::CB.with(|cb| (cb.borrow().3)(ptr));
193 result
194 } else {
195 f.await
196 }
197 });
198
199 JoinHandle {
200 fut: Some(Either::Compio(fut)),
201 }
202 }
203
204 #[derive(Debug, Copy, Clone)]
205 pub struct JoinError;
206
207 impl fmt::Display for JoinError {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "JoinError")
210 }
211 }
212
213 impl std::error::Error for JoinError {}
214
215 enum Either<T> {
216 Compio(compio_runtime::JoinHandle<T>),
217 Spawn(oneshot::Receiver<T>),
218 }
219
220 pub struct JoinHandle<T> {
221 fut: Option<Either<T>>,
222 }
223
224 impl<T> JoinHandle<T> {
225 pub fn is_finished(&self) -> bool {
226 match &self.fut {
227 Some(Either::Compio(fut)) => fut.is_finished(),
228 Some(Either::Spawn(fut)) => fut.is_closed(),
229 None => true,
230 }
231 }
232 }
233
234 impl<T> Drop for JoinHandle<T> {
235 fn drop(&mut self) {
236 if let Some(Either::Compio(fut)) = self.fut.take() {
237 fut.detach();
238 }
239 }
240 }
241
242 impl<T> Future for JoinHandle<T> {
243 type Output = Result<T, JoinError>;
244
245 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246 Poll::Ready(match self.fut.as_mut() {
247 Some(Either::Compio(fut)) => {
248 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
249 }
250 Some(Either::Spawn(fut)) => {
251 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
252 }
253 None => Err(JoinError),
254 })
255 }
256 }
257
258 #[derive(Clone, Debug)]
259 pub struct Handle(crate::Arbiter);
260
261 impl Handle {
262 pub fn current() -> Self {
263 Self(crate::Arbiter::current())
264 }
265
266 pub fn notify(&self) {}
267
268 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
269 where
270 F: Future + Send + 'static,
271 F::Output: Send + 'static,
272 {
273 let (tx, rx) = oneshot::channel();
274 self.0.spawn(async move {
275 let result = future.await;
276 let _ = tx.send(result);
277 });
278 JoinHandle {
279 fut: Some(Either::Spawn(rx)),
280 }
281 }
282 }
283}
284
285#[allow(dead_code)]
286#[cfg(feature = "neon")]
287mod neon {
288 use std::task::{ready, Context, Poll};
289 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
290
291 use ntex_neon::Runtime;
292
293 pub fn block_on<F: Future<Output = ()>>(fut: F) {
296 let rt = Runtime::new().unwrap();
297 log::info!(
298 "Starting neon runtime, driver {:?}",
299 rt.driver_type().name()
300 );
301
302 rt.block_on(fut);
303 }
304
305 #[inline]
313 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
314 where
315 F: Future + 'static,
316 {
317 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
318 let task = ntex_neon::spawn(async move {
319 if let Some(ptr) = ptr {
320 let mut f = std::pin::pin!(f);
321 let result = poll_fn(|ctx| {
322 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
323 let result = f.as_mut().poll(ctx);
324 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
325 result
326 })
327 .await;
328 crate::CB.with(|cb| (cb.borrow().3)(ptr));
329 result
330 } else {
331 f.await
332 }
333 });
334
335 JoinHandle {
336 task: Some(Either::Task(task)),
337 }
338 }
339
340 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
346 where
347 F: FnOnce() -> T + Send + Sync + 'static,
348 T: Send + 'static,
349 {
350 JoinHandle {
351 task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
352 }
353 }
354
355 #[derive(Clone, Debug)]
356 pub struct Handle(ntex_neon::Handle);
357
358 impl Handle {
359 pub fn current() -> Self {
360 Self(ntex_neon::Handle::current())
361 }
362
363 pub fn notify(&self) {
364 let _ = self.0.notify();
365 }
366
367 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
368 where
369 F: Future + Send + 'static,
370 F::Output: Send + 'static,
371 {
372 JoinHandle {
373 task: Some(Either::Task(self.0.spawn(future))),
374 }
375 }
376 }
377
378 #[doc(hidden)]
379 #[deprecated]
380 pub type Task<T> = JoinHandle<T>;
381
382 enum Either<T> {
383 Task(ntex_neon::Task<T>),
384 Blocking(ntex_neon::JoinHandle<T>),
385 }
386
387 pub struct JoinHandle<T> {
389 task: Option<Either<T>>,
390 }
391
392 impl<T> JoinHandle<T> {
393 pub fn is_finished(&self) -> bool {
394 match &self.task {
395 Some(Either::Task(fut)) => fut.is_finished(),
396 Some(Either::Blocking(fut)) => fut.is_closed(),
397 None => true,
398 }
399 }
400 }
401
402 impl<T> Drop for JoinHandle<T> {
403 fn drop(&mut self) {
404 if let Some(Either::Task(fut)) = self.task.take() {
405 fut.detach();
406 }
407 }
408 }
409
410 impl<T> Future for JoinHandle<T> {
411 type Output = Result<T, JoinError>;
412
413 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
414 Poll::Ready(match self.task.as_mut() {
415 Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
416 Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
417 .map_err(|_| JoinError)
418 .and_then(|res| res.map_err(|_| JoinError)),
419 None => Err(JoinError),
420 })
421 }
422 }
423
424 #[derive(Debug, Copy, Clone)]
425 pub struct JoinError;
426
427 impl fmt::Display for JoinError {
428 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429 write!(f, "JoinError")
430 }
431 }
432
433 impl std::error::Error for JoinError {}
434}
435
436#[cfg(feature = "tokio")]
437pub use self::tokio::*;
438
439#[cfg(feature = "compio")]
440pub use self::compio::*;
441
442#[cfg(feature = "neon")]
443pub use self::neon::*;
444
445#[allow(dead_code)]
446#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
447mod no_rt {
448 use std::task::{Context, Poll};
449 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
450
451 pub fn block_on<F: Future<Output = ()>>(_: F) {
454 panic!("async runtime is not configured");
455 }
456
457 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
458 where
459 F: Future + 'static,
460 {
461 unimplemented!()
462 }
463
464 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
465 where
466 F: FnOnce() -> T + Send + Sync + 'static,
467 T: Send + 'static,
468 {
469 unimplemented!()
470 }
471
472 #[allow(clippy::type_complexity)]
475 pub struct JoinHandle<T> {
476 t: PhantomData<T>,
477 }
478
479 impl<T> JoinHandle<T> {
480 pub fn is_finished(&self) -> bool {
481 true
482 }
483 }
484
485 impl<T> Future for JoinHandle<T> {
486 type Output = Result<T, JoinError>;
487
488 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
489 todo!()
490 }
491 }
492
493 #[derive(Debug, Copy, Clone)]
494 pub struct JoinError;
495
496 impl fmt::Display for JoinError {
497 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 write!(f, "JoinError")
499 }
500 }
501
502 impl std::error::Error for JoinError {}
503
504 #[derive(Clone, Debug)]
505 pub struct Handle;
507
508 impl Handle {
509 #[inline]
510 pub fn current() -> Self {
511 Self
512 }
513
514 #[inline]
515 pub fn notify(&self) {}
517
518 #[inline]
519 pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
524 where
525 F: Future + Send + 'static,
526 F::Output: Send + 'static,
527 {
528 panic!("async runtime is not configured");
529 }
530 }
531}
532
533#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
534pub use self::no_rt::*;