1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 use tok_io::runtime::Handle;
56 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58 pub fn block_on<F: Future<Output = ()>>(fut: F) {
61 if let Ok(hnd) = Handle::try_current() {
62 log::debug!("Use existing tokio runtime and block on future");
63 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64 } else {
65 log::debug!("Create tokio runtime and block on future");
66
67 let rt = tok_io::runtime::Builder::new_current_thread()
68 .enable_all()
69 .build()
71 .unwrap();
72 tok_io::task::LocalSet::new().block_on(&rt, fut);
73 }
74 }
75
76 #[inline]
84 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85 where
86 F: Future + 'static,
87 {
88 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89 tok_io::task::spawn_local(async move {
90 if let Some(ptr) = ptr {
91 tok_io::pin!(f);
92 let result = poll_fn(|ctx| {
93 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94 let result = f.as_mut().poll(ctx);
95 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96 result
97 })
98 .await;
99 crate::CB.with(|cb| (cb.borrow().3)(ptr));
100 result
101 } else {
102 f.await
103 }
104 })
105 }
106
107 #[inline]
115 pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116 where
117 F: FnOnce() -> R + 'static,
118 R: Future + 'static,
119 {
120 spawn(async move { f().await })
121 }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127 use std::task::{ready, Context, Poll};
128 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130 use compio_runtime::Runtime;
131
132 pub fn block_on<F: Future<Output = ()>>(fut: F) {
135 log::info!(
136 "Starting compio runtime, driver {:?}",
137 compio_driver::DriverType::current()
138 );
139 let rt = Runtime::new().unwrap();
140 rt.block_on(fut);
141 }
142
143 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149 where
150 F: FnOnce() -> T + Send + Sync + 'static,
151 T: Send + 'static,
152 {
153 JoinHandle {
154 fut: Some(compio_runtime::spawn_blocking(f)),
155 }
156 }
157
158 #[inline]
166 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167 where
168 F: Future + 'static,
169 {
170 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171 let fut = compio_runtime::spawn(async move {
172 if let Some(ptr) = ptr {
173 let mut f = std::pin::pin!(f);
174 let result = poll_fn(|ctx| {
175 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176 let result = f.as_mut().poll(ctx);
177 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178 result
179 })
180 .await;
181 crate::CB.with(|cb| (cb.borrow().3)(ptr));
182 result
183 } else {
184 f.await
185 }
186 });
187
188 JoinHandle { fut: Some(fut) }
189 }
190
191 #[inline]
199 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200 where
201 F: FnOnce() -> R + 'static,
202 R: Future + 'static,
203 {
204 spawn(async move { f().await })
205 }
206
207 #[derive(Debug, Copy, Clone)]
208 pub struct JoinError;
209
210 impl fmt::Display for JoinError {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 write!(f, "JoinError")
213 }
214 }
215
216 impl std::error::Error for JoinError {}
217
218 pub struct JoinHandle<T> {
219 fut: Option<compio_runtime::JoinHandle<T>>,
220 }
221
222 impl<T> JoinHandle<T> {
223 pub fn is_finished(&self) -> bool {
224 if let Some(hnd) = &self.fut {
225 hnd.is_finished()
226 } else {
227 true
228 }
229 }
230 }
231
232 impl<T> Drop for JoinHandle<T> {
233 fn drop(&mut self) {
234 self.fut.take().unwrap().detach();
235 }
236 }
237
238 impl<T> Future for JoinHandle<T> {
239 type Output = Result<T, JoinError>;
240
241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 Poll::Ready(
243 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244 .map_err(|_| JoinError),
245 )
246 }
247 }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "neon")]
252mod neon {
253 use std::task::{ready, Context, Poll};
254 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
255
256 use ntex_neon::Runtime;
257
258 pub fn block_on<F: Future<Output = ()>>(fut: F) {
261 let rt = Runtime::new().unwrap();
262 log::info!(
263 "Starting neon runtime, driver {:?}",
264 rt.driver().tp().name()
265 );
266
267 rt.block_on(fut);
268 }
269
270 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
276 where
277 F: FnOnce() -> T + Send + Sync + 'static,
278 T: Send + 'static,
279 {
280 JoinHandle {
281 fut: Some(ntex_neon::spawn_blocking(f)),
282 }
283 }
284
285 #[inline]
293 pub fn spawn<F>(f: F) -> Task<F::Output>
294 where
295 F: Future + 'static,
296 {
297 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
298 let task = ntex_neon::spawn(async move {
299 if let Some(ptr) = ptr {
300 let mut f = std::pin::pin!(f);
301 let result = poll_fn(|ctx| {
302 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
303 let result = f.as_mut().poll(ctx);
304 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
305 result
306 })
307 .await;
308 crate::CB.with(|cb| (cb.borrow().3)(ptr));
309 result
310 } else {
311 f.await
312 }
313 });
314
315 Task { task: Some(task) }
316 }
317
318 #[inline]
326 pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
327 where
328 F: FnOnce() -> R + 'static,
329 R: Future + 'static,
330 {
331 spawn(async move { f().await })
332 }
333
334 pub struct Task<T> {
336 task: Option<ntex_neon::Task<T>>,
337 }
338
339 impl<T> Task<T> {
340 pub fn is_finished(&self) -> bool {
341 if let Some(hnd) = &self.task {
342 hnd.is_finished()
343 } else {
344 true
345 }
346 }
347 }
348
349 impl<T> Drop for Task<T> {
350 fn drop(&mut self) {
351 self.task.take().unwrap().detach();
352 }
353 }
354
355 impl<T> Future for Task<T> {
356 type Output = Result<T, JoinError>;
357
358 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
359 Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
360 }
361 }
362
363 #[derive(Debug, Copy, Clone)]
364 pub struct JoinError;
365
366 impl fmt::Display for JoinError {
367 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
368 write!(f, "JoinError")
369 }
370 }
371
372 impl std::error::Error for JoinError {}
373
374 pub struct JoinHandle<T> {
375 fut: Option<ntex_neon::JoinHandle<T>>,
376 }
377
378 impl<T> JoinHandle<T> {
379 pub fn is_finished(&self) -> bool {
380 false
381 }
382 }
383
384 impl<T> Future for JoinHandle<T> {
385 type Output = Result<T, JoinError>;
386
387 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388 Poll::Ready(
389 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
390 .map_err(|_| JoinError)
391 .and_then(|result| result.map_err(|_| JoinError)),
392 )
393 }
394 }
395}
396
397#[cfg(feature = "tokio")]
398pub use self::tokio::*;
399
400#[cfg(feature = "compio")]
401pub use self::compio::*;
402
403#[cfg(feature = "neon")]
404pub use self::neon::*;
405
406#[allow(dead_code)]
407#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
408mod no_rt {
409 use std::task::{Context, Poll};
410 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
411
412 pub fn block_on<F: Future<Output = ()>>(_: F) {
415 panic!("async runtime is not configured");
416 }
417
418 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
419 where
420 F: Future + 'static,
421 {
422 unimplemented!()
423 }
424
425 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
426 where
427 F: FnOnce() -> T + Send + Sync + 'static,
428 T: Send + 'static,
429 {
430 unimplemented!()
431 }
432
433 #[allow(clippy::type_complexity)]
436 pub struct JoinHandle<T> {
437 t: PhantomData<T>,
438 }
439
440 impl<T> JoinHandle<T> {
441 pub fn is_finished(&self) -> bool {
442 true
443 }
444 }
445
446 impl<T> Future for JoinHandle<T> {
447 type Output = Result<T, JoinError>;
448
449 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
450 todo!()
451 }
452 }
453
454 #[derive(Debug, Copy, Clone)]
455 pub struct JoinError;
456
457 impl fmt::Display for JoinError {
458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459 write!(f, "JoinError")
460 }
461 }
462
463 impl std::error::Error for JoinError {}
464}
465
466#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
467pub use self::no_rt::*;