1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18 before: Box<dyn Fn() -> Option<*const ()>>,
19 enter: Box<dyn Fn(*const ()) -> *const ()>,
20 exit: Box<dyn Fn(*const ())>,
21 after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25 cb: &'static Callbacks,
26 ptr: *const (),
27}
28
29impl Data {
30 fn load() -> Option<Data> {
31 let cb = CB.with(|cb| cb.get());
32
33 if let Some(cb) = unsafe { cb.as_ref() } {
34 if let Some(ptr) = (*cb.before)() {
35 return Some(Data { cb, ptr });
36 }
37 }
38 None
39 }
40
41 fn run<F, R>(&mut self, f: F) -> R
42 where
43 F: FnOnce() -> R,
44 {
45 let ptr = (*self.cb.enter)(self.ptr);
46 let result = f();
47 (*self.cb.exit)(ptr);
48 result
49 }
50}
51
52impl Drop for Data {
53 fn drop(&mut self) {
54 (*self.cb.after)(self.ptr)
55 }
56}
57
58pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69 before: FBefore,
70 enter: FEnter,
71 exit: FExit,
72 after: FAfter,
73) where
74 FBefore: Fn() -> Option<*const ()> + 'static,
75 FEnter: Fn(*const ()) -> *const () + 'static,
76 FExit: Fn(*const ()) + 'static,
77 FAfter: Fn(*const ()) + 'static,
78{
79 CB.with(|cb| {
80 if !cb.get().is_null() {
81 panic!("Spawn callbacks already set");
82 }
83
84 let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
85 before: Box::new(before),
86 enter: Box::new(enter),
87 exit: Box::new(exit),
88 after: Box::new(after),
89 }));
90 cb.replace(new);
91 });
92}
93
94pub unsafe fn spawn_cbs_try<FBefore, FEnter, FExit, FAfter>(
103 before: FBefore,
104 enter: FEnter,
105 exit: FExit,
106 after: FAfter,
107) -> bool
108where
109 FBefore: Fn() -> Option<*const ()> + 'static,
110 FEnter: Fn(*const ()) -> *const () + 'static,
111 FExit: Fn(*const ()) + 'static,
112 FAfter: Fn(*const ()) + 'static,
113{
114 CB.with(|cb| {
115 if !cb.get().is_null() {
116 false
117 } else {
118 unsafe {
119 spawn_cbs(before, enter, exit, after);
120 }
121 true
122 }
123 })
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "tokio")]
128mod tokio {
129 use std::future::{Future, poll_fn};
130 pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
131
132 pub fn block_on<F: Future<Output = ()>>(fut: F) {
135 if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
136 log::debug!("Use existing tokio runtime and block on future");
137 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
138 } else {
139 log::debug!("Create tokio runtime and block on future");
140
141 let rt = tok_io::runtime::Builder::new_current_thread()
142 .enable_all()
143 .build()
145 .unwrap();
146 tok_io::task::LocalSet::new().block_on(&rt, fut);
147 }
148 }
149
150 #[inline]
158 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
159 where
160 F: Future + 'static,
161 {
162 if let Some(mut data) = crate::Data::load() {
163 tok_io::task::spawn_local(async move {
164 tok_io::pin!(f);
165 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
166 })
167 } else {
168 tok_io::task::spawn_local(f)
169 }
170 }
171
172 #[derive(Clone, Debug)]
173 pub struct Handle(tok_io::runtime::Handle);
175
176 impl Handle {
177 #[inline]
178 pub fn current() -> Self {
179 Self(tok_io::runtime::Handle::current())
180 }
181
182 #[inline]
183 pub fn notify(&self) {}
185
186 #[inline]
187 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
192 where
193 F: Future + Send + 'static,
194 F::Output: Send + 'static,
195 {
196 self.0.spawn(future)
197 }
198 }
199}
200
201#[allow(dead_code)]
202#[cfg(feature = "compio")]
203mod compio {
204 use std::task::{Context, Poll, ready};
205 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
206
207 use compio_driver::DriverType;
208 use compio_runtime::Runtime;
209
210 pub fn block_on<F: Future<Output = ()>>(fut: F) {
213 log::info!(
214 "Starting compio runtime, driver {:?}",
215 compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
216 .unwrap_or(DriverType::Poll)
217 );
218 let rt = Runtime::new().unwrap();
219 rt.block_on(fut);
220 }
221
222 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
228 where
229 F: FnOnce() -> T + Send + Sync + 'static,
230 T: Send + 'static,
231 {
232 JoinHandle {
233 fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
234 }
235 }
236
237 #[inline]
245 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
246 where
247 F: Future + 'static,
248 {
249 let fut = if let Some(mut data) = crate::Data::load() {
250 compio_runtime::spawn(async move {
251 let mut f = std::pin::pin!(f);
252 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
253 })
254 } else {
255 compio_runtime::spawn(f)
256 };
257
258 JoinHandle {
259 fut: Some(Either::Compio(fut)),
260 }
261 }
262
263 #[derive(Debug, Copy, Clone)]
264 pub struct JoinError;
265
266 impl fmt::Display for JoinError {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 write!(f, "JoinError")
269 }
270 }
271
272 impl std::error::Error for JoinError {}
273
274 enum Either<T> {
275 Compio(compio_runtime::JoinHandle<T>),
276 Spawn(oneshot::Receiver<T>),
277 }
278
279 pub struct JoinHandle<T> {
280 fut: Option<Either<T>>,
281 }
282
283 impl<T> JoinHandle<T> {
284 pub fn is_finished(&self) -> bool {
285 match &self.fut {
286 Some(Either::Compio(fut)) => fut.is_finished(),
287 Some(Either::Spawn(fut)) => fut.is_closed(),
288 None => true,
289 }
290 }
291 }
292
293 impl<T> Drop for JoinHandle<T> {
294 fn drop(&mut self) {
295 if let Some(Either::Compio(fut)) = self.fut.take() {
296 fut.detach();
297 }
298 }
299 }
300
301 impl<T> Future for JoinHandle<T> {
302 type Output = Result<T, JoinError>;
303
304 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
305 Poll::Ready(match self.fut.as_mut() {
306 Some(Either::Compio(fut)) => {
307 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
308 }
309 Some(Either::Spawn(fut)) => {
310 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
311 }
312 None => Err(JoinError),
313 })
314 }
315 }
316
317 #[derive(Clone, Debug)]
318 pub struct Handle(crate::Arbiter);
319
320 impl Handle {
321 pub fn current() -> Self {
322 Self(crate::Arbiter::current())
323 }
324
325 pub fn notify(&self) {}
326
327 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
328 where
329 F: Future + Send + 'static,
330 F::Output: Send + 'static,
331 {
332 let (tx, rx) = oneshot::channel();
333 self.0.spawn(async move {
334 let result = future.await;
335 let _ = tx.send(result);
336 });
337 JoinHandle {
338 fut: Some(Either::Spawn(rx)),
339 }
340 }
341 }
342}
343
344#[allow(dead_code)]
345#[cfg(feature = "neon")]
346mod neon {
347 use std::task::{Context, Poll, ready};
348 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
349
350 use ntex_neon::Runtime;
351
352 pub fn block_on<F: Future<Output = ()>>(fut: F) {
355 let rt = Runtime::new().unwrap();
356 log::info!(
357 "Starting neon runtime, driver {:?}",
358 rt.driver_type().name()
359 );
360
361 rt.block_on(fut);
362 }
363
364 #[inline]
372 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
373 where
374 F: Future + 'static,
375 {
376 let task = if let Some(mut data) = crate::Data::load() {
377 ntex_neon::spawn(async move {
378 let mut f = std::pin::pin!(f);
379 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
380 })
381 } else {
382 ntex_neon::spawn(f)
383 };
384
385 JoinHandle {
386 task: Some(Either::Task(task)),
387 }
388 }
389
390 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
396 where
397 F: FnOnce() -> T + Send + Sync + 'static,
398 T: Send + 'static,
399 {
400 JoinHandle {
401 task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
402 }
403 }
404
405 #[derive(Clone, Debug)]
406 pub struct Handle(ntex_neon::Handle);
407
408 impl Handle {
409 pub fn current() -> Self {
410 Self(ntex_neon::Handle::current())
411 }
412
413 pub fn notify(&self) {
414 let _ = self.0.notify();
415 }
416
417 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
418 where
419 F: Future + Send + 'static,
420 F::Output: Send + 'static,
421 {
422 JoinHandle {
423 task: Some(Either::Task(self.0.spawn(future))),
424 }
425 }
426 }
427
428 enum Either<T> {
429 Task(ntex_neon::Task<T>),
430 Blocking(ntex_neon::JoinHandle<T>),
431 }
432
433 pub struct JoinHandle<T> {
435 task: Option<Either<T>>,
436 }
437
438 impl<T> JoinHandle<T> {
439 pub fn is_finished(&self) -> bool {
440 match &self.task {
441 Some(Either::Task(fut)) => fut.is_finished(),
442 Some(Either::Blocking(fut)) => fut.is_closed(),
443 None => true,
444 }
445 }
446 }
447
448 impl<T> Drop for JoinHandle<T> {
449 fn drop(&mut self) {
450 if let Some(Either::Task(fut)) = self.task.take() {
451 fut.detach();
452 }
453 }
454 }
455
456 impl<T> Future for JoinHandle<T> {
457 type Output = Result<T, JoinError>;
458
459 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
460 Poll::Ready(match self.task.as_mut() {
461 Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
462 Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
463 .map_err(|_| JoinError)
464 .and_then(|res| res.map_err(|_| JoinError)),
465 None => Err(JoinError),
466 })
467 }
468 }
469
470 #[derive(Debug, Copy, Clone)]
471 pub struct JoinError;
472
473 impl fmt::Display for JoinError {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 write!(f, "JoinError")
476 }
477 }
478
479 impl std::error::Error for JoinError {}
480}
481
482#[cfg(feature = "tokio")]
483pub use self::tokio::*;
484
485#[cfg(feature = "compio")]
486pub use self::compio::*;
487
488#[cfg(feature = "neon")]
489pub use self::neon::*;
490
491#[allow(dead_code)]
492#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
493mod no_rt {
494 use std::task::{Context, Poll};
495 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
496
497 pub fn block_on<F: Future<Output = ()>>(_: F) {
500 panic!("async runtime is not configured");
501 }
502
503 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
504 where
505 F: Future + 'static,
506 {
507 unimplemented!()
508 }
509
510 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
511 where
512 F: FnOnce() -> T + Send + Sync + 'static,
513 T: Send + 'static,
514 {
515 unimplemented!()
516 }
517
518 #[allow(clippy::type_complexity)]
521 pub struct JoinHandle<T> {
522 t: PhantomData<T>,
523 }
524
525 impl<T> JoinHandle<T> {
526 pub fn is_finished(&self) -> bool {
527 true
528 }
529 }
530
531 impl<T> Future for JoinHandle<T> {
532 type Output = Result<T, JoinError>;
533
534 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
535 todo!()
536 }
537 }
538
539 #[derive(Debug, Copy, Clone)]
540 pub struct JoinError;
541
542 impl fmt::Display for JoinError {
543 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544 write!(f, "JoinError")
545 }
546 }
547
548 impl std::error::Error for JoinError {}
549
550 #[derive(Clone, Debug)]
551 pub struct Handle;
553
554 impl Handle {
555 #[inline]
556 pub fn current() -> Self {
557 Self
558 }
559
560 #[inline]
561 pub fn notify(&self) {}
563
564 #[inline]
565 pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
570 where
571 F: Future + Send + 'static,
572 F::Output: Send + 'static,
573 {
574 panic!("async runtime is not configured");
575 }
576 }
577}
578
579#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
580pub use self::no_rt::*;