ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
56
57    /// Runs the provided future, blocking the current thread until the future
58    /// completes.
59    pub fn block_on<F: Future<Output = ()>>(fut: F) {
60        if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
61            log::debug!("Use existing tokio runtime and block on future");
62            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
63        } else {
64            log::debug!("Create tokio runtime and block on future");
65
66            let rt = tok_io::runtime::Builder::new_current_thread()
67                .enable_all()
68                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
69                .build()
70                .unwrap();
71            tok_io::task::LocalSet::new().block_on(&rt, fut);
72        }
73    }
74
75    /// Spawn a future on the current thread. This does not create a new Arbiter
76    /// or Arbiter address, it is simply a helper for spawning futures on the current
77    /// thread.
78    ///
79    /// # Panics
80    ///
81    /// This function panics if ntex system is not running.
82    #[inline]
83    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
84    where
85        F: Future + 'static,
86    {
87        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
88        tok_io::task::spawn_local(async move {
89            if let Some(ptr) = ptr {
90                tok_io::pin!(f);
91                let result = poll_fn(|ctx| {
92                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
93                    let result = f.as_mut().poll(ctx);
94                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
95                    result
96                })
97                .await;
98                crate::CB.with(|cb| (cb.borrow().3)(ptr));
99                result
100            } else {
101                f.await
102            }
103        })
104    }
105
106    #[derive(Clone, Debug)]
107    /// Handle to the runtime.
108    pub struct Handle(tok_io::runtime::Handle);
109
110    impl Handle {
111        #[inline]
112        pub fn current() -> Self {
113            Self(tok_io::runtime::Handle::current())
114        }
115
116        #[inline]
117        /// Wake up runtime
118        pub fn notify(&self) {}
119
120        #[inline]
121        /// Spawns a new asynchronous task, returning a [`Task`] for it.
122        ///
123        /// Spawning a task enables the task to execute concurrently to other tasks.
124        /// There is no guarantee that a spawned task will execute to completion.
125        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
126        where
127            F: Future + Send + 'static,
128            F::Output: Send + 'static,
129        {
130            self.0.spawn(future)
131        }
132    }
133}
134
135#[allow(dead_code)]
136#[cfg(feature = "compio")]
137mod compio {
138    use std::task::{ready, Context, Poll};
139    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
140
141    use compio_runtime::Runtime;
142
143    /// Runs the provided future, blocking the current thread until the future
144    /// completes.
145    pub fn block_on<F: Future<Output = ()>>(fut: F) {
146        log::info!(
147            "Starting compio runtime, driver {:?}",
148            compio_driver::DriverType::current()
149        );
150        let rt = Runtime::new().unwrap();
151        rt.block_on(fut);
152    }
153
154    /// Spawns a blocking task.
155    ///
156    /// The task will be spawned onto a thread pool specifically dedicated
157    /// to blocking tasks. This is useful to prevent long-running synchronous
158    /// operations from blocking the main futures executor.
159    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
160    where
161        F: FnOnce() -> T + Send + Sync + 'static,
162        T: Send + 'static,
163    {
164        JoinHandle {
165            fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
166        }
167    }
168
169    /// Spawn a future on the current thread. This does not create a new Arbiter
170    /// or Arbiter address, it is simply a helper for spawning futures on the current
171    /// thread.
172    ///
173    /// # Panics
174    ///
175    /// This function panics if ntex system is not running.
176    #[inline]
177    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
178    where
179        F: Future + 'static,
180    {
181        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
182        let fut = compio_runtime::spawn(async move {
183            if let Some(ptr) = ptr {
184                let mut f = std::pin::pin!(f);
185                let result = poll_fn(|ctx| {
186                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
187                    let result = f.as_mut().poll(ctx);
188                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
189                    result
190                })
191                .await;
192                crate::CB.with(|cb| (cb.borrow().3)(ptr));
193                result
194            } else {
195                f.await
196            }
197        });
198
199        JoinHandle {
200            fut: Some(Either::Compio(fut)),
201        }
202    }
203
204    #[derive(Debug, Copy, Clone)]
205    pub struct JoinError;
206
207    impl fmt::Display for JoinError {
208        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209            write!(f, "JoinError")
210        }
211    }
212
213    impl std::error::Error for JoinError {}
214
215    enum Either<T> {
216        Compio(compio_runtime::JoinHandle<T>),
217        Spawn(oneshot::Receiver<T>),
218    }
219
220    pub struct JoinHandle<T> {
221        fut: Option<Either<T>>,
222    }
223
224    impl<T> JoinHandle<T> {
225        pub fn is_finished(&self) -> bool {
226            match &self.fut {
227                Some(Either::Compio(fut)) => fut.is_finished(),
228                Some(Either::Spawn(fut)) => fut.is_closed(),
229                None => true,
230            }
231        }
232    }
233
234    impl<T> Drop for JoinHandle<T> {
235        fn drop(&mut self) {
236            if let Some(Either::Compio(fut)) = self.fut.take() {
237                fut.detach();
238            }
239        }
240    }
241
242    impl<T> Future for JoinHandle<T> {
243        type Output = Result<T, JoinError>;
244
245        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246            Poll::Ready(match self.fut.as_mut() {
247                Some(Either::Compio(fut)) => {
248                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
249                }
250                Some(Either::Spawn(fut)) => {
251                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
252                }
253                None => Err(JoinError),
254            })
255        }
256    }
257
258    #[derive(Clone, Debug)]
259    pub struct Handle(crate::Arbiter);
260
261    impl Handle {
262        pub fn current() -> Self {
263            Self(crate::Arbiter::current())
264        }
265
266        pub fn notify(&self) {}
267
268        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
269        where
270            F: Future + Send + 'static,
271            F::Output: Send + 'static,
272        {
273            let (tx, rx) = oneshot::channel();
274            self.0.spawn(async move {
275                let result = future.await;
276                let _ = tx.send(result);
277            });
278            JoinHandle {
279                fut: Some(Either::Spawn(rx)),
280            }
281        }
282    }
283}
284
285#[allow(dead_code)]
286#[cfg(feature = "neon")]
287mod neon {
288    use std::task::{ready, Context, Poll};
289    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
290
291    use ntex_neon::Runtime;
292
293    /// Runs the provided future, blocking the current thread until the future
294    /// completes.
295    pub fn block_on<F: Future<Output = ()>>(fut: F) {
296        let rt = Runtime::new().unwrap();
297        log::info!(
298            "Starting neon runtime, driver {:?}",
299            rt.driver_type().name()
300        );
301
302        rt.block_on(fut);
303    }
304
305    /// Spawn a future on the current thread. This does not create a new Arbiter
306    /// or Arbiter address, it is simply a helper for spawning futures on the current
307    /// thread.
308    ///
309    /// # Panics
310    ///
311    /// This function panics if ntex system is not running.
312    #[inline]
313    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
314    where
315        F: Future + 'static,
316    {
317        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
318        let task = ntex_neon::spawn(async move {
319            if let Some(ptr) = ptr {
320                let mut f = std::pin::pin!(f);
321                let result = poll_fn(|ctx| {
322                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
323                    let result = f.as_mut().poll(ctx);
324                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
325                    result
326                })
327                .await;
328                crate::CB.with(|cb| (cb.borrow().3)(ptr));
329                result
330            } else {
331                f.await
332            }
333        });
334
335        JoinHandle {
336            task: Some(Either::Task(task)),
337        }
338    }
339
340    /// Spawns a blocking task.
341    ///
342    /// The task will be spawned onto a thread pool specifically dedicated
343    /// to blocking tasks. This is useful to prevent long-running synchronous
344    /// operations from blocking the main futures executor.
345    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
346    where
347        F: FnOnce() -> T + Send + Sync + 'static,
348        T: Send + 'static,
349    {
350        JoinHandle {
351            task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
352        }
353    }
354
355    #[derive(Clone, Debug)]
356    pub struct Handle(ntex_neon::Handle);
357
358    impl Handle {
359        pub fn current() -> Self {
360            Self(ntex_neon::Handle::current())
361        }
362
363        pub fn notify(&self) {
364            let _ = self.0.notify();
365        }
366
367        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
368        where
369            F: Future + Send + 'static,
370            F::Output: Send + 'static,
371        {
372            JoinHandle {
373                task: Some(Either::Task(self.0.spawn(future))),
374            }
375        }
376    }
377
378    #[doc(hidden)]
379    #[deprecated]
380    pub type Task<T> = JoinHandle<T>;
381
382    enum Either<T> {
383        Task(ntex_neon::Task<T>),
384        Blocking(ntex_neon::JoinHandle<T>),
385    }
386
387    /// A spawned task.
388    pub struct JoinHandle<T> {
389        task: Option<Either<T>>,
390    }
391
392    impl<T> JoinHandle<T> {
393        pub fn is_finished(&self) -> bool {
394            match &self.task {
395                Some(Either::Task(fut)) => fut.is_finished(),
396                Some(Either::Blocking(fut)) => fut.is_closed(),
397                None => true,
398            }
399        }
400    }
401
402    impl<T> Drop for JoinHandle<T> {
403        fn drop(&mut self) {
404            if let Some(Either::Task(fut)) = self.task.take() {
405                fut.detach();
406            }
407        }
408    }
409
410    impl<T> Future for JoinHandle<T> {
411        type Output = Result<T, JoinError>;
412
413        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
414            Poll::Ready(match self.task.as_mut() {
415                Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
416                Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
417                    .map_err(|_| JoinError)
418                    .and_then(|res| res.map_err(|_| JoinError)),
419                None => Err(JoinError),
420            })
421        }
422    }
423
424    #[derive(Debug, Copy, Clone)]
425    pub struct JoinError;
426
427    impl fmt::Display for JoinError {
428        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429            write!(f, "JoinError")
430        }
431    }
432
433    impl std::error::Error for JoinError {}
434}
435
436#[cfg(feature = "tokio")]
437pub use self::tokio::*;
438
439#[cfg(feature = "compio")]
440pub use self::compio::*;
441
442#[cfg(feature = "neon")]
443pub use self::neon::*;
444
445#[allow(dead_code)]
446#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
447mod no_rt {
448    use std::task::{Context, Poll};
449    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
450
451    /// Runs the provided future, blocking the current thread until the future
452    /// completes.
453    pub fn block_on<F: Future<Output = ()>>(_: F) {
454        panic!("async runtime is not configured");
455    }
456
457    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
458    where
459        F: Future + 'static,
460    {
461        unimplemented!()
462    }
463
464    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
465    where
466        F: FnOnce() -> T + Send + Sync + 'static,
467        T: Send + 'static,
468    {
469        unimplemented!()
470    }
471
472    /// Blocking operation completion future. It resolves with results
473    /// of blocking function execution.
474    #[allow(clippy::type_complexity)]
475    pub struct JoinHandle<T> {
476        t: PhantomData<T>,
477    }
478
479    impl<T> JoinHandle<T> {
480        pub fn is_finished(&self) -> bool {
481            true
482        }
483    }
484
485    impl<T> Future for JoinHandle<T> {
486        type Output = Result<T, JoinError>;
487
488        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
489            todo!()
490        }
491    }
492
493    #[derive(Debug, Copy, Clone)]
494    pub struct JoinError;
495
496    impl fmt::Display for JoinError {
497        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498            write!(f, "JoinError")
499        }
500    }
501
502    impl std::error::Error for JoinError {}
503
504    #[derive(Clone, Debug)]
505    /// Handle to the runtime.
506    pub struct Handle;
507
508    impl Handle {
509        #[inline]
510        pub fn current() -> Self {
511            Self
512        }
513
514        #[inline]
515        /// Wake up runtime
516        pub fn notify(&self) {}
517
518        #[inline]
519        /// Spawns a new asynchronous task, returning a [`Task`] for it.
520        ///
521        /// Spawning a task enables the task to execute concurrently to other tasks.
522        /// There is no guarantee that a spawned task will execute to completion.
523        pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
524        where
525            F: Future + Send + 'static,
526            F::Output: Send + 'static,
527        {
528            panic!("async runtime is not configured");
529        }
530    }
531}
532
533#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
534pub use self::no_rt::*;