ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::System;
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    use tok_io::runtime::Handle;
56    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58    /// Runs the provided future, blocking the current thread until the future
59    /// completes.
60    pub fn block_on<F: Future<Output = ()>>(fut: F) {
61        if let Ok(hnd) = Handle::try_current() {
62            log::debug!("Use existing tokio runtime and block on future");
63            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64        } else {
65            log::debug!("Create tokio runtime and block on future");
66
67            let rt = tok_io::runtime::Builder::new_current_thread()
68                .enable_all()
69                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
70                .build()
71                .unwrap();
72            tok_io::task::LocalSet::new().block_on(&rt, fut);
73        }
74    }
75
76    /// Spawn a future on the current thread. This does not create a new Arbiter
77    /// or Arbiter address, it is simply a helper for spawning futures on the current
78    /// thread.
79    ///
80    /// # Panics
81    ///
82    /// This function panics if ntex system is not running.
83    #[inline]
84    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85    where
86        F: Future + 'static,
87    {
88        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89        tok_io::task::spawn_local(async move {
90            if let Some(ptr) = ptr {
91                tok_io::pin!(f);
92                let result = poll_fn(|ctx| {
93                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94                    let result = f.as_mut().poll(ctx);
95                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96                    result
97                })
98                .await;
99                crate::CB.with(|cb| (cb.borrow().3)(ptr));
100                result
101            } else {
102                f.await
103            }
104        })
105    }
106
107    /// Executes a future on the current thread. This does not create a new Arbiter
108    /// or Arbiter address, it is simply a helper for executing futures on the current
109    /// thread.
110    ///
111    /// # Panics
112    ///
113    /// This function panics if ntex system is not running.
114    #[inline]
115    pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116    where
117        F: FnOnce() -> R + 'static,
118        R: Future + 'static,
119    {
120        spawn(async move { f().await })
121    }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127    use std::task::{ready, Context, Poll};
128    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130    use compio_runtime::Runtime;
131
132    /// Runs the provided future, blocking the current thread until the future
133    /// completes.
134    pub fn block_on<F: Future<Output = ()>>(fut: F) {
135        log::info!(
136            "Starting compio runtime, driver {:?}",
137            compio_driver::DriverType::current()
138        );
139        let rt = Runtime::new().unwrap();
140        rt.block_on(fut);
141    }
142
143    /// Spawns a blocking task.
144    ///
145    /// The task will be spawned onto a thread pool specifically dedicated
146    /// to blocking tasks. This is useful to prevent long-running synchronous
147    /// operations from blocking the main futures executor.
148    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149    where
150        F: FnOnce() -> T + Send + Sync + 'static,
151        T: Send + 'static,
152    {
153        JoinHandle {
154            fut: Some(compio_runtime::spawn_blocking(f)),
155        }
156    }
157
158    /// Spawn a future on the current thread. This does not create a new Arbiter
159    /// or Arbiter address, it is simply a helper for spawning futures on the current
160    /// thread.
161    ///
162    /// # Panics
163    ///
164    /// This function panics if ntex system is not running.
165    #[inline]
166    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167    where
168        F: Future + 'static,
169    {
170        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171        let fut = compio_runtime::spawn(async move {
172            if let Some(ptr) = ptr {
173                let mut f = std::pin::pin!(f);
174                let result = poll_fn(|ctx| {
175                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176                    let result = f.as_mut().poll(ctx);
177                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178                    result
179                })
180                .await;
181                crate::CB.with(|cb| (cb.borrow().3)(ptr));
182                result
183            } else {
184                f.await
185            }
186        });
187
188        JoinHandle { fut: Some(fut) }
189    }
190
191    /// Executes a future on the current thread. This does not create a new Arbiter
192    /// or Arbiter address, it is simply a helper for executing futures on the current
193    /// thread.
194    ///
195    /// # Panics
196    ///
197    /// This function panics if ntex system is not running.
198    #[inline]
199    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200    where
201        F: FnOnce() -> R + 'static,
202        R: Future + 'static,
203    {
204        spawn(async move { f().await })
205    }
206
207    #[derive(Debug, Copy, Clone)]
208    pub struct JoinError;
209
210    impl fmt::Display for JoinError {
211        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212            write!(f, "JoinError")
213        }
214    }
215
216    impl std::error::Error for JoinError {}
217
218    pub struct JoinHandle<T> {
219        fut: Option<compio_runtime::JoinHandle<T>>,
220    }
221
222    impl<T> JoinHandle<T> {
223        pub fn is_finished(&self) -> bool {
224            if let Some(hnd) = &self.fut {
225                hnd.is_finished()
226            } else {
227                true
228            }
229        }
230    }
231
232    impl<T> Drop for JoinHandle<T> {
233        fn drop(&mut self) {
234            self.fut.take().unwrap().detach();
235        }
236    }
237
238    impl<T> Future for JoinHandle<T> {
239        type Output = Result<T, JoinError>;
240
241        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242            Poll::Ready(
243                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244                    .map_err(|_| JoinError),
245            )
246        }
247    }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "async-std")]
252mod asyncstd {
253    use std::future::{poll_fn, Future};
254    use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll};
255
256    /// Runs the provided future, blocking the current thread until the future
257    /// completes.
258    pub fn block_on<F: Future<Output = ()>>(fut: F) {
259        async_std::task::block_on(fut);
260    }
261
262    /// Spawn a future on the current thread. This does not create a new Arbiter
263    /// or Arbiter address, it is simply a helper for spawning futures on the current
264    /// thread.
265    ///
266    /// # Panics
267    ///
268    /// This function panics if ntex system is not running.
269    #[inline]
270    pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
271    where
272        F: Future + 'static,
273    {
274        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
275        JoinHandle {
276            fut: async_std::task::spawn_local(async move {
277                if let Some(ptr) = ptr {
278                    let mut f = unsafe { Pin::new_unchecked(&mut f) };
279                    let result = poll_fn(|ctx| {
280                        let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
281                        let result = f.as_mut().poll(ctx);
282                        crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
283                        result
284                    })
285                    .await;
286                    crate::CB.with(|cb| (cb.borrow().3)(ptr));
287                    result
288                } else {
289                    f.await
290                }
291            }),
292        }
293    }
294
295    /// Executes a future on the current thread. This does not create a new Arbiter
296    /// or Arbiter address, it is simply a helper for executing futures on the current
297    /// thread.
298    ///
299    /// # Panics
300    ///
301    /// This function panics if ntex system is not running.
302    #[inline]
303    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
304    where
305        F: FnOnce() -> R + 'static,
306        R: Future + 'static,
307    {
308        spawn(async move { f().await })
309    }
310
311    /// Spawns a blocking task.
312    ///
313    /// The task will be spawned onto a thread pool specifically dedicated
314    /// to blocking tasks. This is useful to prevent long-running synchronous
315    /// operations from blocking the main futures executor.
316    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
317    where
318        F: FnOnce() -> T + Send + 'static,
319        T: Send + 'static,
320    {
321        JoinHandle {
322            fut: async_std::task::spawn_blocking(f),
323        }
324    }
325
326    #[derive(Debug, Copy, Clone)]
327    pub struct JoinError;
328
329    impl fmt::Display for JoinError {
330        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331            write!(f, "JoinError")
332        }
333    }
334
335    impl std::error::Error for JoinError {}
336
337    pub struct JoinHandle<T> {
338        fut: async_std::task::JoinHandle<T>,
339    }
340
341    impl<T> Future for JoinHandle<T> {
342        type Output = Result<T, JoinError>;
343
344        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
345            Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
346        }
347    }
348}
349
350#[allow(dead_code)]
351#[cfg(all(feature = "glommio", target_os = "linux"))]
352mod glommio {
353    use std::future::{poll_fn, Future};
354    use std::{pin::Pin, task::Context, task::Poll};
355
356    use futures_channel::oneshot::Canceled;
357    use glomm_io::task;
358
359    pub type JoinError = Canceled;
360
361    /// Runs the provided future, blocking the current thread until the future
362    /// completes.
363    pub fn block_on<F: Future<Output = ()>>(fut: F) {
364        let ex = glomm_io::LocalExecutor::default();
365        ex.run(async move {
366            let _ = fut.await;
367        })
368    }
369
370    /// Spawn a future on the current thread. This does not create a new Arbiter
371    /// or Arbiter address, it is simply a helper for spawning futures on the current
372    /// thread.
373    ///
374    /// # Panics
375    ///
376    /// This function panics if ntex system is not running.
377    #[inline]
378    pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
379    where
380        F: Future + 'static,
381        F::Output: 'static,
382    {
383        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
384        JoinHandle {
385            fut: Either::Left(
386                glomm_io::spawn_local(async move {
387                    if let Some(ptr) = ptr {
388                        glomm_io::executor().yield_now().await;
389                        let mut f = unsafe { Pin::new_unchecked(&mut f) };
390                        let result = poll_fn(|ctx| {
391                            let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
392                            let result = f.as_mut().poll(ctx);
393                            crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
394                            result
395                        })
396                        .await;
397                        crate::CB.with(|cb| (cb.borrow().3)(ptr));
398                        result
399                    } else {
400                        glomm_io::executor().yield_now().await;
401                        f.await
402                    }
403                })
404                .detach(),
405            ),
406        }
407    }
408
409    /// Executes a future on the current thread. This does not create a new Arbiter
410    /// or Arbiter address, it is simply a helper for executing futures on the current
411    /// thread.
412    ///
413    /// # Panics
414    ///
415    /// This function panics if ntex system is not running.
416    #[inline]
417    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
418    where
419        F: FnOnce() -> R + 'static,
420        R: Future + 'static,
421    {
422        spawn(async move { f().await })
423    }
424
425    pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
426    where
427        F: FnOnce() -> R + Send + 'static,
428        R: Send + 'static,
429    {
430        let fut = glomm_io::executor().spawn_blocking(f);
431        JoinHandle {
432            fut: Either::Right(Box::pin(async move { Ok(fut.await) })),
433        }
434    }
435
436    enum Either<T1, T2> {
437        Left(T1),
438        Right(T2),
439    }
440
441    /// Blocking operation completion future. It resolves with results
442    /// of blocking function execution.
443    #[allow(clippy::type_complexity)]
444    pub struct JoinHandle<T> {
445        fut:
446            Either<task::JoinHandle<T>, Pin<Box<dyn Future<Output = Result<T, Canceled>>>>>,
447    }
448
449    impl<T> Future for JoinHandle<T> {
450        type Output = Result<T, Canceled>;
451
452        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
453            match self.fut {
454                Either::Left(ref mut f) => match Pin::new(f).poll(cx) {
455                    Poll::Pending => Poll::Pending,
456                    Poll::Ready(res) => Poll::Ready(res.ok_or(Canceled)),
457                },
458                Either::Right(ref mut f) => Pin::new(f).poll(cx),
459            }
460        }
461    }
462}
463
464#[cfg(feature = "tokio")]
465pub use self::tokio::*;
466
467#[cfg(feature = "async-std")]
468pub use self::asyncstd::*;
469
470#[cfg(feature = "glommio")]
471pub use self::glommio::*;
472
473#[cfg(feature = "compio")]
474pub use self::compio::*;
475
476#[allow(dead_code)]
477#[cfg(all(
478    not(feature = "tokio"),
479    not(feature = "async-std"),
480    not(feature = "compio"),
481    not(feature = "glommio")
482))]
483mod no_rt {
484    use std::task::{Context, Poll};
485    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
486
487    /// Runs the provided future, blocking the current thread until the future
488    /// completes.
489    pub fn block_on<F: Future<Output = ()>>(_: F) {
490        panic!("async runtime is not configured");
491    }
492
493    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
494    where
495        F: Future + 'static,
496    {
497        unimplemented!()
498    }
499
500    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
501    where
502        F: FnOnce() -> T + Send + Sync + 'static,
503        T: Send + 'static,
504    {
505        unimplemented!()
506    }
507
508    /// Blocking operation completion future. It resolves with results
509    /// of blocking function execution.
510    #[allow(clippy::type_complexity)]
511    pub struct JoinHandle<T> {
512        t: PhantomData<T>,
513    }
514
515    impl<T> JoinHandle<T> {
516        pub fn is_finished(&self) -> bool {
517            true
518        }
519    }
520
521    impl<T> Future for JoinHandle<T> {
522        type Output = Result<T, JoinError>;
523
524        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
525            todo!()
526        }
527    }
528
529    #[derive(Debug, Copy, Clone)]
530    pub struct JoinError;
531
532    impl fmt::Display for JoinError {
533        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534            write!(f, "JoinError")
535        }
536    }
537
538    impl std::error::Error for JoinError {}
539}
540
541#[cfg(all(
542    not(feature = "tokio"),
543    not(feature = "async-std"),
544    not(feature = "compio"),
545    not(feature = "glommio")
546))]
547pub use self::no_rt::*;