1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 use tok_io::runtime::Handle;
56 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58 pub fn block_on<F: Future<Output = ()>>(fut: F) {
61 if let Ok(hnd) = Handle::try_current() {
62 log::debug!("Use existing tokio runtime and block on future");
63 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64 } else {
65 log::debug!("Create tokio runtime and block on future");
66
67 let rt = tok_io::runtime::Builder::new_current_thread()
68 .enable_all()
69 .build()
71 .unwrap();
72 tok_io::task::LocalSet::new().block_on(&rt, fut);
73 }
74 }
75
76 #[inline]
84 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85 where
86 F: Future + 'static,
87 {
88 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89 tok_io::task::spawn_local(async move {
90 if let Some(ptr) = ptr {
91 tok_io::pin!(f);
92 let result = poll_fn(|ctx| {
93 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94 let result = f.as_mut().poll(ctx);
95 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96 result
97 })
98 .await;
99 crate::CB.with(|cb| (cb.borrow().3)(ptr));
100 result
101 } else {
102 f.await
103 }
104 })
105 }
106
107 #[inline]
115 pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116 where
117 F: FnOnce() -> R + 'static,
118 R: Future + 'static,
119 {
120 spawn(async move { f().await })
121 }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127 use std::task::{ready, Context, Poll};
128 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130 use compio_runtime::Runtime;
131
132 pub fn block_on<F: Future<Output = ()>>(fut: F) {
135 log::info!(
136 "Starting compio runtime, driver {:?}",
137 compio_driver::DriverType::current()
138 );
139 let rt = Runtime::new().unwrap();
140 rt.block_on(fut);
141 }
142
143 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149 where
150 F: FnOnce() -> T + Send + Sync + 'static,
151 T: Send + 'static,
152 {
153 JoinHandle {
154 fut: Some(compio_runtime::spawn_blocking(f)),
155 }
156 }
157
158 #[inline]
166 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167 where
168 F: Future + 'static,
169 {
170 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171 let fut = compio_runtime::spawn(async move {
172 if let Some(ptr) = ptr {
173 let mut f = std::pin::pin!(f);
174 let result = poll_fn(|ctx| {
175 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176 let result = f.as_mut().poll(ctx);
177 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178 result
179 })
180 .await;
181 crate::CB.with(|cb| (cb.borrow().3)(ptr));
182 result
183 } else {
184 f.await
185 }
186 });
187
188 JoinHandle { fut: Some(fut) }
189 }
190
191 #[inline]
199 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200 where
201 F: FnOnce() -> R + 'static,
202 R: Future + 'static,
203 {
204 spawn(async move { f().await })
205 }
206
207 #[derive(Debug, Copy, Clone)]
208 pub struct JoinError;
209
210 impl fmt::Display for JoinError {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 write!(f, "JoinError")
213 }
214 }
215
216 impl std::error::Error for JoinError {}
217
218 pub struct JoinHandle<T> {
219 fut: Option<compio_runtime::JoinHandle<T>>,
220 }
221
222 impl<T> JoinHandle<T> {
223 pub fn is_finished(&self) -> bool {
224 if let Some(hnd) = &self.fut {
225 hnd.is_finished()
226 } else {
227 true
228 }
229 }
230 }
231
232 impl<T> Drop for JoinHandle<T> {
233 fn drop(&mut self) {
234 self.fut.take().unwrap().detach();
235 }
236 }
237
238 impl<T> Future for JoinHandle<T> {
239 type Output = Result<T, JoinError>;
240
241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 Poll::Ready(
243 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244 .map_err(|_| JoinError),
245 )
246 }
247 }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "neon")]
252mod neon {
253 use std::task::{ready, Context, Poll};
254 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
255
256 use ntex_neon::Runtime;
257
258 pub fn block_on<F: Future<Output = ()>>(fut: F) {
261 log::info!(
262 "Starting neon runtime, driver {:?}",
263 ntex_neon::driver::DriverType::current()
264 );
265 let rt = Runtime::new().unwrap();
266 rt.block_on(fut);
267 }
268
269 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
275 where
276 F: FnOnce() -> T + Send + Sync + 'static,
277 T: Send + 'static,
278 {
279 JoinHandle {
280 fut: Some(ntex_neon::spawn_blocking(f)),
281 }
282 }
283
284 #[inline]
292 pub fn spawn<F>(f: F) -> Task<F::Output>
293 where
294 F: Future + 'static,
295 {
296 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
297 let task = ntex_neon::spawn(async move {
298 if let Some(ptr) = ptr {
299 let mut f = std::pin::pin!(f);
300 let result = poll_fn(|ctx| {
301 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
302 let result = f.as_mut().poll(ctx);
303 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
304 result
305 })
306 .await;
307 crate::CB.with(|cb| (cb.borrow().3)(ptr));
308 result
309 } else {
310 f.await
311 }
312 });
313
314 Task { task: Some(task) }
315 }
316
317 #[inline]
325 pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
326 where
327 F: FnOnce() -> R + 'static,
328 R: Future + 'static,
329 {
330 spawn(async move { f().await })
331 }
332
333 pub struct Task<T> {
335 task: Option<ntex_neon::Task<T>>,
336 }
337
338 impl<T> Task<T> {
339 pub fn is_finished(&self) -> bool {
340 if let Some(hnd) = &self.task {
341 hnd.is_finished()
342 } else {
343 true
344 }
345 }
346 }
347
348 impl<T> Drop for Task<T> {
349 fn drop(&mut self) {
350 self.task.take().unwrap().detach();
351 }
352 }
353
354 impl<T> Future for Task<T> {
355 type Output = Result<T, JoinError>;
356
357 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
358 Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
359 }
360 }
361
362 #[derive(Debug, Copy, Clone)]
363 pub struct JoinError;
364
365 impl fmt::Display for JoinError {
366 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367 write!(f, "JoinError")
368 }
369 }
370
371 impl std::error::Error for JoinError {}
372
373 pub struct JoinHandle<T> {
374 fut: Option<ntex_neon::JoinHandle<T>>,
375 }
376
377 impl<T> JoinHandle<T> {
378 pub fn is_finished(&self) -> bool {
379 if let Some(hnd) = &self.fut {
380 hnd.is_finished()
381 } else {
382 true
383 }
384 }
385 }
386
387 impl<T> Drop for JoinHandle<T> {
388 fn drop(&mut self) {
389 self.fut.take().unwrap().detach();
390 }
391 }
392
393 impl<T> Future for JoinHandle<T> {
394 type Output = Result<T, JoinError>;
395
396 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
397 Poll::Ready(
398 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
399 .map_err(|_| JoinError),
400 )
401 }
402 }
403}
404
405#[cfg(feature = "tokio")]
406pub use self::tokio::*;
407
408#[cfg(feature = "compio")]
409pub use self::compio::*;
410
411#[cfg(feature = "neon")]
412pub use self::neon::*;
413
414#[allow(dead_code)]
415#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
416mod no_rt {
417 use std::task::{Context, Poll};
418 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
419
420 pub fn block_on<F: Future<Output = ()>>(_: F) {
423 panic!("async runtime is not configured");
424 }
425
426 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
427 where
428 F: Future + 'static,
429 {
430 unimplemented!()
431 }
432
433 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
434 where
435 F: FnOnce() -> T + Send + Sync + 'static,
436 T: Send + 'static,
437 {
438 unimplemented!()
439 }
440
441 #[allow(clippy::type_complexity)]
444 pub struct JoinHandle<T> {
445 t: PhantomData<T>,
446 }
447
448 impl<T> JoinHandle<T> {
449 pub fn is_finished(&self) -> bool {
450 true
451 }
452 }
453
454 impl<T> Future for JoinHandle<T> {
455 type Output = Result<T, JoinError>;
456
457 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
458 todo!()
459 }
460 }
461
462 #[derive(Debug, Copy, Clone)]
463 pub struct JoinError;
464
465 impl fmt::Display for JoinError {
466 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467 write!(f, "JoinError")
468 }
469 }
470
471 impl std::error::Error for JoinError {}
472}
473
474#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
475pub use self::no_rt::*;