1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18 before: Box<dyn Fn() -> Option<*const ()>>,
19 enter: Box<dyn Fn(*const ()) -> *const ()>,
20 exit: Box<dyn Fn(*const ())>,
21 after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25 cb: &'static Callbacks,
26 ptr: *const (),
27}
28
29impl Data {
30 fn load() -> Option<Data> {
31 let cb = CB.with(|cb| cb.get());
32
33 if let Some(cb) = unsafe { cb.as_ref() } {
34 if let Some(ptr) = (*cb.before)() {
35 return Some(Data { cb, ptr });
36 }
37 }
38 None
39 }
40
41 fn run<F, R>(&mut self, f: F) -> R
42 where
43 F: FnOnce() -> R,
44 {
45 let ptr = (*self.cb.enter)(self.ptr);
46 let result = f();
47 (*self.cb.exit)(ptr);
48 result
49 }
50}
51
52impl Drop for Data {
53 fn drop(&mut self) {
54 (*self.cb.after)(self.ptr)
55 }
56}
57
58pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69 before: FBefore,
70 enter: FEnter,
71 exit: FExit,
72 after: FAfter,
73) where
74 FBefore: Fn() -> Option<*const ()> + 'static,
75 FEnter: Fn(*const ()) -> *const () + 'static,
76 FExit: Fn(*const ()) + 'static,
77 FAfter: Fn(*const ()) + 'static,
78{
79 CB.with(|cb| {
80 if !cb.get().is_null() {
81 panic!("Spawn callbacks already set");
82 }
83
84 let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
85 before: Box::new(before),
86 enter: Box::new(enter),
87 exit: Box::new(exit),
88 after: Box::new(after),
89 }));
90 cb.replace(new);
91 });
92}
93
94pub unsafe fn spawn_cbs_try<FBefore, FEnter, FExit, FAfter>(
103 before: FBefore,
104 enter: FEnter,
105 exit: FExit,
106 after: FAfter,
107) -> bool
108where
109 FBefore: Fn() -> Option<*const ()> + 'static,
110 FEnter: Fn(*const ()) -> *const () + 'static,
111 FExit: Fn(*const ()) + 'static,
112 FAfter: Fn(*const ()) + 'static,
113{
114 CB.with(|cb| {
115 if !cb.get().is_null() {
116 false
117 } else {
118 unsafe {
119 spawn_cbs(before, enter, exit, after);
120 }
121 true
122 }
123 })
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "tokio")]
128mod tokio {
129 use std::future::{Future, poll_fn};
130 pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
131
132 pub fn block_on<F: Future<Output = ()>>(fut: F) {
135 if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
136 log::debug!("Use existing tokio runtime and block on future");
137 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
138 } else {
139 log::debug!("Create tokio runtime and block on future");
140
141 let rt = tok_io::runtime::Builder::new_current_thread()
142 .enable_all()
143 .build()
145 .unwrap();
146 tok_io::task::LocalSet::new().block_on(&rt, fut);
147 }
148 }
149
150 #[inline]
158 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
159 where
160 F: Future + 'static,
161 {
162 let data = crate::Data::load();
163 if let Some(mut data) = data {
164 tok_io::task::spawn_local(async move {
165 tok_io::pin!(f);
166 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
167 })
168 } else {
169 tok_io::task::spawn_local(f)
170 }
171 }
172
173 #[derive(Clone, Debug)]
174 pub struct Handle(tok_io::runtime::Handle);
176
177 impl Handle {
178 #[inline]
179 pub fn current() -> Self {
180 Self(tok_io::runtime::Handle::current())
181 }
182
183 #[inline]
184 pub fn notify(&self) {}
186
187 #[inline]
188 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
193 where
194 F: Future + Send + 'static,
195 F::Output: Send + 'static,
196 {
197 self.0.spawn(future)
198 }
199 }
200}
201
202#[allow(dead_code)]
203#[cfg(feature = "compio")]
204mod compio {
205 use std::task::{Context, Poll, ready};
206 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
207
208 use compio_driver::DriverType;
209 use compio_runtime::Runtime;
210
211 pub fn block_on<F: Future<Output = ()>>(fut: F) {
214 log::info!(
215 "Starting compio runtime, driver {:?}",
216 compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
217 .unwrap_or(DriverType::Poll)
218 );
219 let rt = Runtime::new().unwrap();
220 rt.block_on(fut);
221 }
222
223 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
229 where
230 F: FnOnce() -> T + Send + Sync + 'static,
231 T: Send + 'static,
232 {
233 JoinHandle {
234 fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
235 }
236 }
237
238 #[inline]
246 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
247 where
248 F: Future + 'static,
249 {
250 let fut = if let Some(mut data) = crate::Data::load() {
251 compio_runtime::spawn(async move {
252 let mut f = std::pin::pin!(f);
253 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
254 })
255 } else {
256 compio_runtime::spawn(f)
257 };
258
259 JoinHandle {
260 fut: Some(Either::Compio(fut)),
261 }
262 }
263
264 #[derive(Debug, Copy, Clone)]
265 pub struct JoinError;
266
267 impl fmt::Display for JoinError {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 write!(f, "JoinError")
270 }
271 }
272
273 impl std::error::Error for JoinError {}
274
275 enum Either<T> {
276 Compio(compio_runtime::JoinHandle<T>),
277 Spawn(oneshot::Receiver<T>),
278 }
279
280 pub struct JoinHandle<T> {
281 fut: Option<Either<T>>,
282 }
283
284 impl<T> JoinHandle<T> {
285 pub fn is_finished(&self) -> bool {
286 match &self.fut {
287 Some(Either::Compio(fut)) => fut.is_finished(),
288 Some(Either::Spawn(fut)) => fut.is_closed(),
289 None => true,
290 }
291 }
292 }
293
294 impl<T> Drop for JoinHandle<T> {
295 fn drop(&mut self) {
296 if let Some(Either::Compio(fut)) = self.fut.take() {
297 fut.detach();
298 }
299 }
300 }
301
302 impl<T> Future for JoinHandle<T> {
303 type Output = Result<T, JoinError>;
304
305 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
306 Poll::Ready(match self.fut.as_mut() {
307 Some(Either::Compio(fut)) => {
308 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
309 }
310 Some(Either::Spawn(fut)) => {
311 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
312 }
313 None => Err(JoinError),
314 })
315 }
316 }
317
318 #[derive(Clone, Debug)]
319 pub struct Handle(crate::Arbiter);
320
321 impl Handle {
322 pub fn current() -> Self {
323 Self(crate::Arbiter::current())
324 }
325
326 pub fn notify(&self) {}
327
328 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
329 where
330 F: Future + Send + 'static,
331 F::Output: Send + 'static,
332 {
333 let (tx, rx) = oneshot::channel();
334 self.0.spawn(async move {
335 let result = future.await;
336 let _ = tx.send(result);
337 });
338 JoinHandle {
339 fut: Some(Either::Spawn(rx)),
340 }
341 }
342 }
343}
344
345#[allow(dead_code)]
346#[cfg(feature = "neon")]
347mod neon {
348 use std::task::{Context, Poll, ready};
349 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
350
351 use ntex_neon::Runtime;
352
353 pub fn block_on<F: Future<Output = ()>>(fut: F) {
356 let rt = Runtime::new().unwrap();
357 log::info!(
358 "Starting neon runtime, driver {:?}",
359 rt.driver_type().name()
360 );
361
362 rt.block_on(fut);
363 }
364
365 #[inline]
373 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
374 where
375 F: Future + 'static,
376 {
377 let task = if let Some(mut data) = crate::Data::load() {
378 ntex_neon::spawn(async move {
379 let mut f = std::pin::pin!(f);
380 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
381 })
382 } else {
383 ntex_neon::spawn(f)
384 };
385
386 JoinHandle {
387 task: Some(Either::Task(task)),
388 }
389 }
390
391 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
397 where
398 F: FnOnce() -> T + Send + Sync + 'static,
399 T: Send + 'static,
400 {
401 JoinHandle {
402 task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
403 }
404 }
405
406 #[derive(Clone, Debug)]
407 pub struct Handle(ntex_neon::Handle);
408
409 impl Handle {
410 pub fn current() -> Self {
411 Self(ntex_neon::Handle::current())
412 }
413
414 pub fn notify(&self) {
415 let _ = self.0.notify();
416 }
417
418 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
419 where
420 F: Future + Send + 'static,
421 F::Output: Send + 'static,
422 {
423 JoinHandle {
424 task: Some(Either::Task(self.0.spawn(future))),
425 }
426 }
427 }
428
429 enum Either<T> {
430 Task(ntex_neon::Task<T>),
431 Blocking(ntex_neon::JoinHandle<T>),
432 }
433
434 pub struct JoinHandle<T> {
436 task: Option<Either<T>>,
437 }
438
439 impl<T> JoinHandle<T> {
440 pub fn is_finished(&self) -> bool {
441 match &self.task {
442 Some(Either::Task(fut)) => fut.is_finished(),
443 Some(Either::Blocking(fut)) => fut.is_closed(),
444 None => true,
445 }
446 }
447 }
448
449 impl<T> Drop for JoinHandle<T> {
450 fn drop(&mut self) {
451 if let Some(Either::Task(fut)) = self.task.take() {
452 fut.detach();
453 }
454 }
455 }
456
457 impl<T> Future for JoinHandle<T> {
458 type Output = Result<T, JoinError>;
459
460 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461 Poll::Ready(match self.task.as_mut() {
462 Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
463 Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
464 .map_err(|_| JoinError)
465 .and_then(|res| res.map_err(|_| JoinError)),
466 None => Err(JoinError),
467 })
468 }
469 }
470
471 #[derive(Debug, Copy, Clone)]
472 pub struct JoinError;
473
474 impl fmt::Display for JoinError {
475 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476 write!(f, "JoinError")
477 }
478 }
479
480 impl std::error::Error for JoinError {}
481}
482
483#[cfg(feature = "tokio")]
484pub use self::tokio::*;
485
486#[cfg(feature = "compio")]
487pub use self::compio::*;
488
489#[cfg(feature = "neon")]
490pub use self::neon::*;
491
492#[allow(dead_code)]
493#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
494mod no_rt {
495 use std::task::{Context, Poll};
496 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
497
498 pub fn block_on<F: Future<Output = ()>>(_: F) {
501 panic!("async runtime is not configured");
502 }
503
504 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
505 where
506 F: Future + 'static,
507 {
508 unimplemented!()
509 }
510
511 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
512 where
513 F: FnOnce() -> T + Send + Sync + 'static,
514 T: Send + 'static,
515 {
516 unimplemented!()
517 }
518
519 #[allow(clippy::type_complexity)]
522 pub struct JoinHandle<T> {
523 t: PhantomData<T>,
524 }
525
526 impl<T> JoinHandle<T> {
527 pub fn is_finished(&self) -> bool {
528 true
529 }
530 }
531
532 impl<T> Future for JoinHandle<T> {
533 type Output = Result<T, JoinError>;
534
535 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
536 todo!()
537 }
538 }
539
540 #[derive(Debug, Copy, Clone)]
541 pub struct JoinError;
542
543 impl fmt::Display for JoinError {
544 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545 write!(f, "JoinError")
546 }
547 }
548
549 impl std::error::Error for JoinError {}
550
551 #[derive(Clone, Debug)]
552 pub struct Handle;
554
555 impl Handle {
556 #[inline]
557 pub fn current() -> Self {
558 Self
559 }
560
561 #[inline]
562 pub fn notify(&self) {}
564
565 #[inline]
566 pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
571 where
572 F: Future + Send + 'static,
573 F::Output: Send + 'static,
574 {
575 panic!("async runtime is not configured");
576 }
577 }
578}
579
580#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
581pub use self::no_rt::*;