1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
56
57 pub fn block_on<F: Future<Output = ()>>(fut: F) {
60 if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
61 log::debug!("Use existing tokio runtime and block on future");
62 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
63 } else {
64 log::debug!("Create tokio runtime and block on future");
65
66 let rt = tok_io::runtime::Builder::new_current_thread()
67 .enable_all()
68 .build()
70 .unwrap();
71 tok_io::task::LocalSet::new().block_on(&rt, fut);
72 }
73 }
74
75 #[inline]
83 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
84 where
85 F: Future + 'static,
86 {
87 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
88 tok_io::task::spawn_local(async move {
89 if let Some(ptr) = ptr {
90 tok_io::pin!(f);
91 let result = poll_fn(|ctx| {
92 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
93 let result = f.as_mut().poll(ctx);
94 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
95 result
96 })
97 .await;
98 crate::CB.with(|cb| (cb.borrow().3)(ptr));
99 result
100 } else {
101 f.await
102 }
103 })
104 }
105
106 #[derive(Clone, Debug)]
107 pub struct Handle(tok_io::runtime::Handle);
109
110 impl Handle {
111 #[inline]
112 pub fn current() -> Self {
113 Self(tok_io::runtime::Handle::current())
114 }
115
116 #[inline]
117 pub fn notify(&self) {}
119
120 #[inline]
121 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
126 where
127 F: Future + Send + 'static,
128 F::Output: Send + 'static,
129 {
130 self.0.spawn(future)
131 }
132 }
133}
134
135#[allow(dead_code)]
136#[cfg(feature = "compio")]
137mod compio {
138 use std::task::{ready, Context, Poll};
139 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
140
141 use compio_driver::DriverType;
142 use compio_runtime::Runtime;
143
144 pub fn block_on<F: Future<Output = ()>>(fut: F) {
147 log::info!(
148 "Starting compio runtime, driver {:?}",
149 compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
150 .unwrap_or(DriverType::Poll)
151 );
152 let rt = Runtime::new().unwrap();
153 rt.block_on(fut);
154 }
155
156 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
162 where
163 F: FnOnce() -> T + Send + Sync + 'static,
164 T: Send + 'static,
165 {
166 JoinHandle {
167 fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
168 }
169 }
170
171 #[inline]
179 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
180 where
181 F: Future + 'static,
182 {
183 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
184 let fut = compio_runtime::spawn(async move {
185 if let Some(ptr) = ptr {
186 let mut f = std::pin::pin!(f);
187 let result = poll_fn(|ctx| {
188 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
189 let result = f.as_mut().poll(ctx);
190 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
191 result
192 })
193 .await;
194 crate::CB.with(|cb| (cb.borrow().3)(ptr));
195 result
196 } else {
197 f.await
198 }
199 });
200
201 JoinHandle {
202 fut: Some(Either::Compio(fut)),
203 }
204 }
205
206 #[derive(Debug, Copy, Clone)]
207 pub struct JoinError;
208
209 impl fmt::Display for JoinError {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 write!(f, "JoinError")
212 }
213 }
214
215 impl std::error::Error for JoinError {}
216
217 enum Either<T> {
218 Compio(compio_runtime::JoinHandle<T>),
219 Spawn(oneshot::Receiver<T>),
220 }
221
222 pub struct JoinHandle<T> {
223 fut: Option<Either<T>>,
224 }
225
226 impl<T> JoinHandle<T> {
227 pub fn is_finished(&self) -> bool {
228 match &self.fut {
229 Some(Either::Compio(fut)) => fut.is_finished(),
230 Some(Either::Spawn(fut)) => fut.is_closed(),
231 None => true,
232 }
233 }
234 }
235
236 impl<T> Drop for JoinHandle<T> {
237 fn drop(&mut self) {
238 if let Some(Either::Compio(fut)) = self.fut.take() {
239 fut.detach();
240 }
241 }
242 }
243
244 impl<T> Future for JoinHandle<T> {
245 type Output = Result<T, JoinError>;
246
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 Poll::Ready(match self.fut.as_mut() {
249 Some(Either::Compio(fut)) => {
250 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
251 }
252 Some(Either::Spawn(fut)) => {
253 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
254 }
255 None => Err(JoinError),
256 })
257 }
258 }
259
260 #[derive(Clone, Debug)]
261 pub struct Handle(crate::Arbiter);
262
263 impl Handle {
264 pub fn current() -> Self {
265 Self(crate::Arbiter::current())
266 }
267
268 pub fn notify(&self) {}
269
270 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
271 where
272 F: Future + Send + 'static,
273 F::Output: Send + 'static,
274 {
275 let (tx, rx) = oneshot::channel();
276 self.0.spawn(async move {
277 let result = future.await;
278 let _ = tx.send(result);
279 });
280 JoinHandle {
281 fut: Some(Either::Spawn(rx)),
282 }
283 }
284 }
285}
286
287#[allow(dead_code)]
288#[cfg(feature = "neon")]
289mod neon {
290 use std::task::{ready, Context, Poll};
291 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
292
293 use ntex_neon::Runtime;
294
295 pub fn block_on<F: Future<Output = ()>>(fut: F) {
298 let rt = Runtime::new().unwrap();
299 log::info!(
300 "Starting neon runtime, driver {:?}",
301 rt.driver_type().name()
302 );
303
304 rt.block_on(fut);
305 }
306
307 #[inline]
315 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
316 where
317 F: Future + 'static,
318 {
319 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
320 let task = ntex_neon::spawn(async move {
321 if let Some(ptr) = ptr {
322 let mut f = std::pin::pin!(f);
323 let result = poll_fn(|ctx| {
324 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
325 let result = f.as_mut().poll(ctx);
326 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
327 result
328 })
329 .await;
330 crate::CB.with(|cb| (cb.borrow().3)(ptr));
331 result
332 } else {
333 f.await
334 }
335 });
336
337 JoinHandle {
338 task: Some(Either::Task(task)),
339 }
340 }
341
342 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
348 where
349 F: FnOnce() -> T + Send + Sync + 'static,
350 T: Send + 'static,
351 {
352 JoinHandle {
353 task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
354 }
355 }
356
357 #[derive(Clone, Debug)]
358 pub struct Handle(ntex_neon::Handle);
359
360 impl Handle {
361 pub fn current() -> Self {
362 Self(ntex_neon::Handle::current())
363 }
364
365 pub fn notify(&self) {
366 let _ = self.0.notify();
367 }
368
369 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
370 where
371 F: Future + Send + 'static,
372 F::Output: Send + 'static,
373 {
374 JoinHandle {
375 task: Some(Either::Task(self.0.spawn(future))),
376 }
377 }
378 }
379
380 #[doc(hidden)]
381 #[deprecated]
382 pub type Task<T> = JoinHandle<T>;
383
384 enum Either<T> {
385 Task(ntex_neon::Task<T>),
386 Blocking(ntex_neon::JoinHandle<T>),
387 }
388
389 pub struct JoinHandle<T> {
391 task: Option<Either<T>>,
392 }
393
394 impl<T> JoinHandle<T> {
395 pub fn is_finished(&self) -> bool {
396 match &self.task {
397 Some(Either::Task(fut)) => fut.is_finished(),
398 Some(Either::Blocking(fut)) => fut.is_closed(),
399 None => true,
400 }
401 }
402 }
403
404 impl<T> Drop for JoinHandle<T> {
405 fn drop(&mut self) {
406 if let Some(Either::Task(fut)) = self.task.take() {
407 fut.detach();
408 }
409 }
410 }
411
412 impl<T> Future for JoinHandle<T> {
413 type Output = Result<T, JoinError>;
414
415 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
416 Poll::Ready(match self.task.as_mut() {
417 Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
418 Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
419 .map_err(|_| JoinError)
420 .and_then(|res| res.map_err(|_| JoinError)),
421 None => Err(JoinError),
422 })
423 }
424 }
425
426 #[derive(Debug, Copy, Clone)]
427 pub struct JoinError;
428
429 impl fmt::Display for JoinError {
430 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431 write!(f, "JoinError")
432 }
433 }
434
435 impl std::error::Error for JoinError {}
436}
437
438#[cfg(feature = "tokio")]
439pub use self::tokio::*;
440
441#[cfg(feature = "compio")]
442pub use self::compio::*;
443
444#[cfg(feature = "neon")]
445pub use self::neon::*;
446
447#[allow(dead_code)]
448#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
449mod no_rt {
450 use std::task::{Context, Poll};
451 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
452
453 pub fn block_on<F: Future<Output = ()>>(_: F) {
456 panic!("async runtime is not configured");
457 }
458
459 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
460 where
461 F: Future + 'static,
462 {
463 unimplemented!()
464 }
465
466 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
467 where
468 F: FnOnce() -> T + Send + Sync + 'static,
469 T: Send + 'static,
470 {
471 unimplemented!()
472 }
473
474 #[allow(clippy::type_complexity)]
477 pub struct JoinHandle<T> {
478 t: PhantomData<T>,
479 }
480
481 impl<T> JoinHandle<T> {
482 pub fn is_finished(&self) -> bool {
483 true
484 }
485 }
486
487 impl<T> Future for JoinHandle<T> {
488 type Output = Result<T, JoinError>;
489
490 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
491 todo!()
492 }
493 }
494
495 #[derive(Debug, Copy, Clone)]
496 pub struct JoinError;
497
498 impl fmt::Display for JoinError {
499 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500 write!(f, "JoinError")
501 }
502 }
503
504 impl std::error::Error for JoinError {}
505
506 #[derive(Clone, Debug)]
507 pub struct Handle;
509
510 impl Handle {
511 #[inline]
512 pub fn current() -> Self {
513 Self
514 }
515
516 #[inline]
517 pub fn notify(&self) {}
519
520 #[inline]
521 pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
526 where
527 F: Future + Send + 'static,
528 F::Output: Send + 'static,
529 {
530 panic!("async runtime is not configured");
531 }
532 }
533}
534
535#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
536pub use self::no_rt::*;