ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
56
57    /// Runs the provided future, blocking the current thread until the future
58    /// completes.
59    pub fn block_on<F: Future<Output = ()>>(fut: F) {
60        if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
61            log::debug!("Use existing tokio runtime and block on future");
62            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
63        } else {
64            log::debug!("Create tokio runtime and block on future");
65
66            let rt = tok_io::runtime::Builder::new_current_thread()
67                .enable_all()
68                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
69                .build()
70                .unwrap();
71            tok_io::task::LocalSet::new().block_on(&rt, fut);
72        }
73    }
74
75    /// Spawn a future on the current thread. This does not create a new Arbiter
76    /// or Arbiter address, it is simply a helper for spawning futures on the current
77    /// thread.
78    ///
79    /// # Panics
80    ///
81    /// This function panics if ntex system is not running.
82    #[inline]
83    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
84    where
85        F: Future + 'static,
86    {
87        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
88        tok_io::task::spawn_local(async move {
89            if let Some(ptr) = ptr {
90                tok_io::pin!(f);
91                let result = poll_fn(|ctx| {
92                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
93                    let result = f.as_mut().poll(ctx);
94                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
95                    result
96                })
97                .await;
98                crate::CB.with(|cb| (cb.borrow().3)(ptr));
99                result
100            } else {
101                f.await
102            }
103        })
104    }
105
106    #[derive(Clone, Debug)]
107    /// Handle to the runtime.
108    pub struct Handle(tok_io::runtime::Handle);
109
110    impl Handle {
111        #[inline]
112        pub fn current() -> Self {
113            Self(tok_io::runtime::Handle::current())
114        }
115
116        #[inline]
117        /// Wake up runtime
118        pub fn notify(&self) {}
119
120        #[inline]
121        /// Spawns a new asynchronous task, returning a [`Task`] for it.
122        ///
123        /// Spawning a task enables the task to execute concurrently to other tasks.
124        /// There is no guarantee that a spawned task will execute to completion.
125        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
126        where
127            F: Future + Send + 'static,
128            F::Output: Send + 'static,
129        {
130            self.0.spawn(future)
131        }
132    }
133}
134
135#[allow(dead_code)]
136#[cfg(feature = "compio")]
137mod compio {
138    use std::task::{ready, Context, Poll};
139    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
140
141    use compio_driver::DriverType;
142    use compio_runtime::Runtime;
143
144    /// Runs the provided future, blocking the current thread until the future
145    /// completes.
146    pub fn block_on<F: Future<Output = ()>>(fut: F) {
147        log::info!(
148            "Starting compio runtime, driver {:?}",
149            compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
150                .unwrap_or(DriverType::Poll)
151        );
152        let rt = Runtime::new().unwrap();
153        rt.block_on(fut);
154    }
155
156    /// Spawns a blocking task.
157    ///
158    /// The task will be spawned onto a thread pool specifically dedicated
159    /// to blocking tasks. This is useful to prevent long-running synchronous
160    /// operations from blocking the main futures executor.
161    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
162    where
163        F: FnOnce() -> T + Send + Sync + 'static,
164        T: Send + 'static,
165    {
166        JoinHandle {
167            fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
168        }
169    }
170
171    /// Spawn a future on the current thread. This does not create a new Arbiter
172    /// or Arbiter address, it is simply a helper for spawning futures on the current
173    /// thread.
174    ///
175    /// # Panics
176    ///
177    /// This function panics if ntex system is not running.
178    #[inline]
179    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
180    where
181        F: Future + 'static,
182    {
183        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
184        let fut = compio_runtime::spawn(async move {
185            if let Some(ptr) = ptr {
186                let mut f = std::pin::pin!(f);
187                let result = poll_fn(|ctx| {
188                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
189                    let result = f.as_mut().poll(ctx);
190                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
191                    result
192                })
193                .await;
194                crate::CB.with(|cb| (cb.borrow().3)(ptr));
195                result
196            } else {
197                f.await
198            }
199        });
200
201        JoinHandle {
202            fut: Some(Either::Compio(fut)),
203        }
204    }
205
206    #[derive(Debug, Copy, Clone)]
207    pub struct JoinError;
208
209    impl fmt::Display for JoinError {
210        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211            write!(f, "JoinError")
212        }
213    }
214
215    impl std::error::Error for JoinError {}
216
217    enum Either<T> {
218        Compio(compio_runtime::JoinHandle<T>),
219        Spawn(oneshot::Receiver<T>),
220    }
221
222    pub struct JoinHandle<T> {
223        fut: Option<Either<T>>,
224    }
225
226    impl<T> JoinHandle<T> {
227        pub fn is_finished(&self) -> bool {
228            match &self.fut {
229                Some(Either::Compio(fut)) => fut.is_finished(),
230                Some(Either::Spawn(fut)) => fut.is_closed(),
231                None => true,
232            }
233        }
234    }
235
236    impl<T> Drop for JoinHandle<T> {
237        fn drop(&mut self) {
238            if let Some(Either::Compio(fut)) = self.fut.take() {
239                fut.detach();
240            }
241        }
242    }
243
244    impl<T> Future for JoinHandle<T> {
245        type Output = Result<T, JoinError>;
246
247        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248            Poll::Ready(match self.fut.as_mut() {
249                Some(Either::Compio(fut)) => {
250                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
251                }
252                Some(Either::Spawn(fut)) => {
253                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
254                }
255                None => Err(JoinError),
256            })
257        }
258    }
259
260    #[derive(Clone, Debug)]
261    pub struct Handle(crate::Arbiter);
262
263    impl Handle {
264        pub fn current() -> Self {
265            Self(crate::Arbiter::current())
266        }
267
268        pub fn notify(&self) {}
269
270        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
271        where
272            F: Future + Send + 'static,
273            F::Output: Send + 'static,
274        {
275            let (tx, rx) = oneshot::channel();
276            self.0.spawn(async move {
277                let result = future.await;
278                let _ = tx.send(result);
279            });
280            JoinHandle {
281                fut: Some(Either::Spawn(rx)),
282            }
283        }
284    }
285}
286
287#[allow(dead_code)]
288#[cfg(feature = "neon")]
289mod neon {
290    use std::task::{ready, Context, Poll};
291    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
292
293    use ntex_neon::Runtime;
294
295    /// Runs the provided future, blocking the current thread until the future
296    /// completes.
297    pub fn block_on<F: Future<Output = ()>>(fut: F) {
298        let rt = Runtime::new().unwrap();
299        log::info!(
300            "Starting neon runtime, driver {:?}",
301            rt.driver_type().name()
302        );
303
304        rt.block_on(fut);
305    }
306
307    /// Spawn a future on the current thread. This does not create a new Arbiter
308    /// or Arbiter address, it is simply a helper for spawning futures on the current
309    /// thread.
310    ///
311    /// # Panics
312    ///
313    /// This function panics if ntex system is not running.
314    #[inline]
315    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
316    where
317        F: Future + 'static,
318    {
319        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
320        let task = ntex_neon::spawn(async move {
321            if let Some(ptr) = ptr {
322                let mut f = std::pin::pin!(f);
323                let result = poll_fn(|ctx| {
324                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
325                    let result = f.as_mut().poll(ctx);
326                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
327                    result
328                })
329                .await;
330                crate::CB.with(|cb| (cb.borrow().3)(ptr));
331                result
332            } else {
333                f.await
334            }
335        });
336
337        JoinHandle {
338            task: Some(Either::Task(task)),
339        }
340    }
341
342    /// Spawns a blocking task.
343    ///
344    /// The task will be spawned onto a thread pool specifically dedicated
345    /// to blocking tasks. This is useful to prevent long-running synchronous
346    /// operations from blocking the main futures executor.
347    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
348    where
349        F: FnOnce() -> T + Send + Sync + 'static,
350        T: Send + 'static,
351    {
352        JoinHandle {
353            task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
354        }
355    }
356
357    #[derive(Clone, Debug)]
358    pub struct Handle(ntex_neon::Handle);
359
360    impl Handle {
361        pub fn current() -> Self {
362            Self(ntex_neon::Handle::current())
363        }
364
365        pub fn notify(&self) {
366            let _ = self.0.notify();
367        }
368
369        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
370        where
371            F: Future + Send + 'static,
372            F::Output: Send + 'static,
373        {
374            JoinHandle {
375                task: Some(Either::Task(self.0.spawn(future))),
376            }
377        }
378    }
379
380    #[doc(hidden)]
381    #[deprecated]
382    pub type Task<T> = JoinHandle<T>;
383
384    enum Either<T> {
385        Task(ntex_neon::Task<T>),
386        Blocking(ntex_neon::JoinHandle<T>),
387    }
388
389    /// A spawned task.
390    pub struct JoinHandle<T> {
391        task: Option<Either<T>>,
392    }
393
394    impl<T> JoinHandle<T> {
395        pub fn is_finished(&self) -> bool {
396            match &self.task {
397                Some(Either::Task(fut)) => fut.is_finished(),
398                Some(Either::Blocking(fut)) => fut.is_closed(),
399                None => true,
400            }
401        }
402    }
403
404    impl<T> Drop for JoinHandle<T> {
405        fn drop(&mut self) {
406            if let Some(Either::Task(fut)) = self.task.take() {
407                fut.detach();
408            }
409        }
410    }
411
412    impl<T> Future for JoinHandle<T> {
413        type Output = Result<T, JoinError>;
414
415        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
416            Poll::Ready(match self.task.as_mut() {
417                Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
418                Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
419                    .map_err(|_| JoinError)
420                    .and_then(|res| res.map_err(|_| JoinError)),
421                None => Err(JoinError),
422            })
423        }
424    }
425
426    #[derive(Debug, Copy, Clone)]
427    pub struct JoinError;
428
429    impl fmt::Display for JoinError {
430        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431            write!(f, "JoinError")
432        }
433    }
434
435    impl std::error::Error for JoinError {}
436}
437
438#[cfg(feature = "tokio")]
439pub use self::tokio::*;
440
441#[cfg(feature = "compio")]
442pub use self::compio::*;
443
444#[cfg(feature = "neon")]
445pub use self::neon::*;
446
447#[allow(dead_code)]
448#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
449mod no_rt {
450    use std::task::{Context, Poll};
451    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
452
453    /// Runs the provided future, blocking the current thread until the future
454    /// completes.
455    pub fn block_on<F: Future<Output = ()>>(_: F) {
456        panic!("async runtime is not configured");
457    }
458
459    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
460    where
461        F: Future + 'static,
462    {
463        unimplemented!()
464    }
465
466    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
467    where
468        F: FnOnce() -> T + Send + Sync + 'static,
469        T: Send + 'static,
470    {
471        unimplemented!()
472    }
473
474    /// Blocking operation completion future. It resolves with results
475    /// of blocking function execution.
476    #[allow(clippy::type_complexity)]
477    pub struct JoinHandle<T> {
478        t: PhantomData<T>,
479    }
480
481    impl<T> JoinHandle<T> {
482        pub fn is_finished(&self) -> bool {
483            true
484        }
485    }
486
487    impl<T> Future for JoinHandle<T> {
488        type Output = Result<T, JoinError>;
489
490        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
491            todo!()
492        }
493    }
494
495    #[derive(Debug, Copy, Clone)]
496    pub struct JoinError;
497
498    impl fmt::Display for JoinError {
499        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500            write!(f, "JoinError")
501        }
502    }
503
504    impl std::error::Error for JoinError {}
505
506    #[derive(Clone, Debug)]
507    /// Handle to the runtime.
508    pub struct Handle;
509
510    impl Handle {
511        #[inline]
512        pub fn current() -> Self {
513            Self
514        }
515
516        #[inline]
517        /// Wake up runtime
518        pub fn notify(&self) {}
519
520        #[inline]
521        /// Spawns a new asynchronous task, returning a [`Task`] for it.
522        ///
523        /// Spawning a task enables the task to execute concurrently to other tasks.
524        /// There is no guarantee that a spawned task will execute to completion.
525        pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
526        where
527            F: Future + Send + 'static,
528            F::Output: Send + 'static,
529        {
530            panic!("async runtime is not configured");
531        }
532    }
533}
534
535#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
536pub use self::no_rt::*;