1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18 before: Box<dyn Fn() -> Option<*const ()>>,
19 enter: Box<dyn Fn(*const ()) -> *const ()>,
20 exit: Box<dyn Fn(*const ())>,
21 after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25 cb: &'static Callbacks,
26 ptr: *const (),
27}
28
29impl Data {
30 fn load() -> Option<Data> {
31 let cb = CB.with(|cb| cb.get());
32
33 if let Some(cb) = unsafe { cb.as_ref() } {
34 if let Some(ptr) = (*cb.before)() {
35 return Some(Data { cb, ptr });
36 }
37 }
38 None
39 }
40
41 fn run<F, R>(&mut self, f: F) -> R
42 where
43 F: FnOnce() -> R,
44 {
45 let ptr = (*self.cb.enter)(self.ptr);
46 let result = f();
47 (*self.cb.exit)(ptr);
48 result
49 }
50}
51
52impl Drop for Data {
53 fn drop(&mut self) {
54 (*self.cb.after)(self.ptr)
55 }
56}
57
58pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69 before: FBefore,
70 enter: FEnter,
71 exit: FExit,
72 after: FAfter,
73) where
74 FBefore: Fn() -> Option<*const ()> + 'static,
75 FEnter: Fn(*const ()) -> *const () + 'static,
76 FExit: Fn(*const ()) + 'static,
77 FAfter: Fn(*const ()) + 'static,
78{
79 CB.with(|cb| {
80 let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
81 before: Box::new(before),
82 enter: Box::new(enter),
83 exit: Box::new(exit),
84 after: Box::new(after),
85 }));
86
87 if !cb.replace(new).is_null() {
88 panic!("Spawn callbacks already set");
89 }
90 });
91}
92
93#[allow(dead_code)]
94#[cfg(feature = "tokio")]
95mod tokio {
96 use std::future::{Future, poll_fn};
97 pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
98
99 pub fn block_on<F: Future<Output = ()>>(fut: F) {
102 if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
103 log::debug!("Use existing tokio runtime and block on future");
104 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
105 } else {
106 log::debug!("Create tokio runtime and block on future");
107
108 let rt = tok_io::runtime::Builder::new_current_thread()
109 .enable_all()
110 .build()
112 .unwrap();
113 tok_io::task::LocalSet::new().block_on(&rt, fut);
114 }
115 }
116
117 #[inline]
125 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
126 where
127 F: Future + 'static,
128 {
129 let data = crate::Data::load();
130 if let Some(mut data) = data {
131 tok_io::task::spawn_local(async move {
132 tok_io::pin!(f);
133 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
134 })
135 } else {
136 tok_io::task::spawn_local(f)
137 }
138 }
139
140 #[derive(Clone, Debug)]
141 pub struct Handle(tok_io::runtime::Handle);
143
144 impl Handle {
145 #[inline]
146 pub fn current() -> Self {
147 Self(tok_io::runtime::Handle::current())
148 }
149
150 #[inline]
151 pub fn notify(&self) {}
153
154 #[inline]
155 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
160 where
161 F: Future + Send + 'static,
162 F::Output: Send + 'static,
163 {
164 self.0.spawn(future)
165 }
166 }
167}
168
169#[allow(dead_code)]
170#[cfg(feature = "compio")]
171mod compio {
172 use std::task::{Context, Poll, ready};
173 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
174
175 use compio_driver::DriverType;
176 use compio_runtime::Runtime;
177
178 pub fn block_on<F: Future<Output = ()>>(fut: F) {
181 log::info!(
182 "Starting compio runtime, driver {:?}",
183 compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
184 .unwrap_or(DriverType::Poll)
185 );
186 let rt = Runtime::new().unwrap();
187 rt.block_on(fut);
188 }
189
190 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
196 where
197 F: FnOnce() -> T + Send + Sync + 'static,
198 T: Send + 'static,
199 {
200 JoinHandle {
201 fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
202 }
203 }
204
205 #[inline]
213 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
214 where
215 F: Future + 'static,
216 {
217 let fut = if let Some(mut data) = crate::Data::load() {
218 compio_runtime::spawn(async move {
219 let mut f = std::pin::pin!(f);
220 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
221 })
222 } else {
223 compio_runtime::spawn(f)
224 };
225
226 JoinHandle {
227 fut: Some(Either::Compio(fut)),
228 }
229 }
230
231 #[derive(Debug, Copy, Clone)]
232 pub struct JoinError;
233
234 impl fmt::Display for JoinError {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 write!(f, "JoinError")
237 }
238 }
239
240 impl std::error::Error for JoinError {}
241
242 enum Either<T> {
243 Compio(compio_runtime::JoinHandle<T>),
244 Spawn(oneshot::Receiver<T>),
245 }
246
247 pub struct JoinHandle<T> {
248 fut: Option<Either<T>>,
249 }
250
251 impl<T> JoinHandle<T> {
252 pub fn is_finished(&self) -> bool {
253 match &self.fut {
254 Some(Either::Compio(fut)) => fut.is_finished(),
255 Some(Either::Spawn(fut)) => fut.is_closed(),
256 None => true,
257 }
258 }
259 }
260
261 impl<T> Drop for JoinHandle<T> {
262 fn drop(&mut self) {
263 if let Some(Either::Compio(fut)) = self.fut.take() {
264 fut.detach();
265 }
266 }
267 }
268
269 impl<T> Future for JoinHandle<T> {
270 type Output = Result<T, JoinError>;
271
272 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273 Poll::Ready(match self.fut.as_mut() {
274 Some(Either::Compio(fut)) => {
275 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
276 }
277 Some(Either::Spawn(fut)) => {
278 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
279 }
280 None => Err(JoinError),
281 })
282 }
283 }
284
285 #[derive(Clone, Debug)]
286 pub struct Handle(crate::Arbiter);
287
288 impl Handle {
289 pub fn current() -> Self {
290 Self(crate::Arbiter::current())
291 }
292
293 pub fn notify(&self) {}
294
295 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
296 where
297 F: Future + Send + 'static,
298 F::Output: Send + 'static,
299 {
300 let (tx, rx) = oneshot::channel();
301 self.0.spawn(async move {
302 let result = future.await;
303 let _ = tx.send(result);
304 });
305 JoinHandle {
306 fut: Some(Either::Spawn(rx)),
307 }
308 }
309 }
310}
311
312#[allow(dead_code)]
313#[cfg(feature = "neon")]
314mod neon {
315 use std::task::{Context, Poll, ready};
316 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
317
318 use ntex_neon::Runtime;
319
320 pub fn block_on<F: Future<Output = ()>>(fut: F) {
323 let rt = Runtime::new().unwrap();
324 log::info!(
325 "Starting neon runtime, driver {:?}",
326 rt.driver_type().name()
327 );
328
329 rt.block_on(fut);
330 }
331
332 #[inline]
340 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
341 where
342 F: Future + 'static,
343 {
344 let task = if let Some(mut data) = crate::Data::load() {
345 ntex_neon::spawn(async move {
346 let mut f = std::pin::pin!(f);
347 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
348 })
349 } else {
350 ntex_neon::spawn(f)
351 };
352
353 JoinHandle {
354 task: Some(Either::Task(task)),
355 }
356 }
357
358 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
364 where
365 F: FnOnce() -> T + Send + Sync + 'static,
366 T: Send + 'static,
367 {
368 JoinHandle {
369 task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
370 }
371 }
372
373 #[derive(Clone, Debug)]
374 pub struct Handle(ntex_neon::Handle);
375
376 impl Handle {
377 pub fn current() -> Self {
378 Self(ntex_neon::Handle::current())
379 }
380
381 pub fn notify(&self) {
382 let _ = self.0.notify();
383 }
384
385 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
386 where
387 F: Future + Send + 'static,
388 F::Output: Send + 'static,
389 {
390 JoinHandle {
391 task: Some(Either::Task(self.0.spawn(future))),
392 }
393 }
394 }
395
396 enum Either<T> {
397 Task(ntex_neon::Task<T>),
398 Blocking(ntex_neon::JoinHandle<T>),
399 }
400
401 pub struct JoinHandle<T> {
403 task: Option<Either<T>>,
404 }
405
406 impl<T> JoinHandle<T> {
407 pub fn is_finished(&self) -> bool {
408 match &self.task {
409 Some(Either::Task(fut)) => fut.is_finished(),
410 Some(Either::Blocking(fut)) => fut.is_closed(),
411 None => true,
412 }
413 }
414 }
415
416 impl<T> Drop for JoinHandle<T> {
417 fn drop(&mut self) {
418 if let Some(Either::Task(fut)) = self.task.take() {
419 fut.detach();
420 }
421 }
422 }
423
424 impl<T> Future for JoinHandle<T> {
425 type Output = Result<T, JoinError>;
426
427 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428 Poll::Ready(match self.task.as_mut() {
429 Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
430 Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
431 .map_err(|_| JoinError)
432 .and_then(|res| res.map_err(|_| JoinError)),
433 None => Err(JoinError),
434 })
435 }
436 }
437
438 #[derive(Debug, Copy, Clone)]
439 pub struct JoinError;
440
441 impl fmt::Display for JoinError {
442 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443 write!(f, "JoinError")
444 }
445 }
446
447 impl std::error::Error for JoinError {}
448}
449
450#[cfg(feature = "tokio")]
451pub use self::tokio::*;
452
453#[cfg(feature = "compio")]
454pub use self::compio::*;
455
456#[cfg(feature = "neon")]
457pub use self::neon::*;
458
459#[allow(dead_code)]
460#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
461mod no_rt {
462 use std::task::{Context, Poll};
463 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
464
465 pub fn block_on<F: Future<Output = ()>>(_: F) {
468 panic!("async runtime is not configured");
469 }
470
471 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
472 where
473 F: Future + 'static,
474 {
475 unimplemented!()
476 }
477
478 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
479 where
480 F: FnOnce() -> T + Send + Sync + 'static,
481 T: Send + 'static,
482 {
483 unimplemented!()
484 }
485
486 #[allow(clippy::type_complexity)]
489 pub struct JoinHandle<T> {
490 t: PhantomData<T>,
491 }
492
493 impl<T> JoinHandle<T> {
494 pub fn is_finished(&self) -> bool {
495 true
496 }
497 }
498
499 impl<T> Future for JoinHandle<T> {
500 type Output = Result<T, JoinError>;
501
502 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
503 todo!()
504 }
505 }
506
507 #[derive(Debug, Copy, Clone)]
508 pub struct JoinError;
509
510 impl fmt::Display for JoinError {
511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512 write!(f, "JoinError")
513 }
514 }
515
516 impl std::error::Error for JoinError {}
517
518 #[derive(Clone, Debug)]
519 pub struct Handle;
521
522 impl Handle {
523 #[inline]
524 pub fn current() -> Self {
525 Self
526 }
527
528 #[inline]
529 pub fn notify(&self) {}
531
532 #[inline]
533 pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
538 where
539 F: Future + Send + 'static,
540 F::Output: Send + 'static,
541 {
542 panic!("async runtime is not configured");
543 }
544 }
545}
546
547#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
548pub use self::no_rt::*;