ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    use tok_io::runtime::Handle;
56    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58    /// Runs the provided future, blocking the current thread until the future
59    /// completes.
60    pub fn block_on<F: Future<Output = ()>>(fut: F) {
61        if let Ok(hnd) = Handle::try_current() {
62            log::debug!("Use existing tokio runtime and block on future");
63            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64        } else {
65            log::debug!("Create tokio runtime and block on future");
66
67            let rt = tok_io::runtime::Builder::new_current_thread()
68                .enable_all()
69                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
70                .build()
71                .unwrap();
72            tok_io::task::LocalSet::new().block_on(&rt, fut);
73        }
74    }
75
76    /// Spawn a future on the current thread. This does not create a new Arbiter
77    /// or Arbiter address, it is simply a helper for spawning futures on the current
78    /// thread.
79    ///
80    /// # Panics
81    ///
82    /// This function panics if ntex system is not running.
83    #[inline]
84    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85    where
86        F: Future + 'static,
87    {
88        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89        tok_io::task::spawn_local(async move {
90            if let Some(ptr) = ptr {
91                tok_io::pin!(f);
92                let result = poll_fn(|ctx| {
93                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94                    let result = f.as_mut().poll(ctx);
95                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96                    result
97                })
98                .await;
99                crate::CB.with(|cb| (cb.borrow().3)(ptr));
100                result
101            } else {
102                f.await
103            }
104        })
105    }
106
107    /// Executes a future on the current thread. This does not create a new Arbiter
108    /// or Arbiter address, it is simply a helper for executing futures on the current
109    /// thread.
110    ///
111    /// # Panics
112    ///
113    /// This function panics if ntex system is not running.
114    #[inline]
115    #[doc(hidden)]
116    #[deprecated]
117    pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
118    where
119        F: FnOnce() -> R + 'static,
120        R: Future + 'static,
121    {
122        spawn(async move { f().await })
123    }
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "compio")]
128mod compio {
129    use std::task::{ready, Context, Poll};
130    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
131
132    use compio_runtime::Runtime;
133
134    /// Runs the provided future, blocking the current thread until the future
135    /// completes.
136    pub fn block_on<F: Future<Output = ()>>(fut: F) {
137        log::info!(
138            "Starting compio runtime, driver {:?}",
139            compio_driver::DriverType::current()
140        );
141        let rt = Runtime::new().unwrap();
142        rt.block_on(fut);
143    }
144
145    /// Spawns a blocking task.
146    ///
147    /// The task will be spawned onto a thread pool specifically dedicated
148    /// to blocking tasks. This is useful to prevent long-running synchronous
149    /// operations from blocking the main futures executor.
150    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
151    where
152        F: FnOnce() -> T + Send + Sync + 'static,
153        T: Send + 'static,
154    {
155        JoinHandle {
156            fut: Some(compio_runtime::spawn_blocking(f)),
157        }
158    }
159
160    /// Spawn a future on the current thread. This does not create a new Arbiter
161    /// or Arbiter address, it is simply a helper for spawning futures on the current
162    /// thread.
163    ///
164    /// # Panics
165    ///
166    /// This function panics if ntex system is not running.
167    #[inline]
168    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
169    where
170        F: Future + 'static,
171    {
172        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
173        let fut = compio_runtime::spawn(async move {
174            if let Some(ptr) = ptr {
175                let mut f = std::pin::pin!(f);
176                let result = poll_fn(|ctx| {
177                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
178                    let result = f.as_mut().poll(ctx);
179                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
180                    result
181                })
182                .await;
183                crate::CB.with(|cb| (cb.borrow().3)(ptr));
184                result
185            } else {
186                f.await
187            }
188        });
189
190        JoinHandle { fut: Some(fut) }
191    }
192
193    /// Executes a future on the current thread. This does not create a new Arbiter
194    /// or Arbiter address, it is simply a helper for executing futures on the current
195    /// thread.
196    ///
197    /// # Panics
198    ///
199    /// This function panics if ntex system is not running.
200    #[inline]
201    #[doc(hidden)]
202    #[deprecated]
203    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
204    where
205        F: FnOnce() -> R + 'static,
206        R: Future + 'static,
207    {
208        spawn(async move { f().await })
209    }
210
211    #[derive(Debug, Copy, Clone)]
212    pub struct JoinError;
213
214    impl fmt::Display for JoinError {
215        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216            write!(f, "JoinError")
217        }
218    }
219
220    impl std::error::Error for JoinError {}
221
222    pub struct JoinHandle<T> {
223        fut: Option<compio_runtime::JoinHandle<T>>,
224    }
225
226    impl<T> JoinHandle<T> {
227        pub fn is_finished(&self) -> bool {
228            if let Some(hnd) = &self.fut {
229                hnd.is_finished()
230            } else {
231                true
232            }
233        }
234    }
235
236    impl<T> Drop for JoinHandle<T> {
237        fn drop(&mut self) {
238            self.fut.take().unwrap().detach();
239        }
240    }
241
242    impl<T> Future for JoinHandle<T> {
243        type Output = Result<T, JoinError>;
244
245        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246            Poll::Ready(
247                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
248                    .map_err(|_| JoinError),
249            )
250        }
251    }
252}
253
254#[allow(dead_code)]
255#[cfg(feature = "neon")]
256mod neon {
257    use std::task::{ready, Context, Poll};
258    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
259
260    use ntex_neon::Runtime;
261
262    /// Runs the provided future, blocking the current thread until the future
263    /// completes.
264    pub fn block_on<F: Future<Output = ()>>(fut: F) {
265        let rt = Runtime::new().unwrap();
266        log::info!(
267            "Starting neon runtime, driver {:?}",
268            rt.driver_type().name()
269        );
270
271        rt.block_on(fut);
272    }
273
274    /// Spawns a blocking task.
275    ///
276    /// The task will be spawned onto a thread pool specifically dedicated
277    /// to blocking tasks. This is useful to prevent long-running synchronous
278    /// operations from blocking the main futures executor.
279    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
280    where
281        F: FnOnce() -> T + Send + Sync + 'static,
282        T: Send + 'static,
283    {
284        JoinHandle {
285            fut: Some(ntex_neon::spawn_blocking(f)),
286        }
287    }
288
289    /// Spawn a future on the current thread. This does not create a new Arbiter
290    /// or Arbiter address, it is simply a helper for spawning futures on the current
291    /// thread.
292    ///
293    /// # Panics
294    ///
295    /// This function panics if ntex system is not running.
296    #[inline]
297    pub fn spawn<F>(f: F) -> Task<F::Output>
298    where
299        F: Future + 'static,
300    {
301        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
302        let task = ntex_neon::spawn(async move {
303            if let Some(ptr) = ptr {
304                let mut f = std::pin::pin!(f);
305                let result = poll_fn(|ctx| {
306                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
307                    let result = f.as_mut().poll(ctx);
308                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
309                    result
310                })
311                .await;
312                crate::CB.with(|cb| (cb.borrow().3)(ptr));
313                result
314            } else {
315                f.await
316            }
317        });
318
319        Task { task: Some(task) }
320    }
321
322    /// Executes a future on the current thread. This does not create a new Arbiter
323    /// or Arbiter address, it is simply a helper for executing futures on the current
324    /// thread.
325    ///
326    /// # Panics
327    ///
328    /// This function panics if ntex system is not running.
329    #[inline]
330    #[doc(hidden)]
331    #[deprecated]
332    pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
333    where
334        F: FnOnce() -> R + 'static,
335        R: Future + 'static,
336    {
337        spawn(async move { f().await })
338    }
339
340    /// A spawned task.
341    pub struct Task<T> {
342        task: Option<ntex_neon::Task<T>>,
343    }
344
345    impl<T> Task<T> {
346        pub fn is_finished(&self) -> bool {
347            if let Some(hnd) = &self.task {
348                hnd.is_finished()
349            } else {
350                true
351            }
352        }
353    }
354
355    impl<T> Drop for Task<T> {
356        fn drop(&mut self) {
357            self.task.take().unwrap().detach();
358        }
359    }
360
361    impl<T> Future for Task<T> {
362        type Output = Result<T, JoinError>;
363
364        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365            Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
366        }
367    }
368
369    #[derive(Debug, Copy, Clone)]
370    pub struct JoinError;
371
372    impl fmt::Display for JoinError {
373        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374            write!(f, "JoinError")
375        }
376    }
377
378    impl std::error::Error for JoinError {}
379
380    pub struct JoinHandle<T> {
381        fut: Option<ntex_neon::JoinHandle<T>>,
382    }
383
384    impl<T> JoinHandle<T> {
385        pub fn is_finished(&self) -> bool {
386            self.fut.is_none()
387        }
388    }
389
390    impl<T> Future for JoinHandle<T> {
391        type Output = Result<T, JoinError>;
392
393        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
394            Poll::Ready(
395                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
396                    .map_err(|_| JoinError)
397                    .and_then(|result| result.map_err(|_| JoinError)),
398            )
399        }
400    }
401}
402
403#[cfg(feature = "tokio")]
404pub use self::tokio::*;
405
406#[cfg(feature = "compio")]
407pub use self::compio::*;
408
409#[cfg(feature = "neon")]
410pub use self::neon::*;
411
412#[allow(dead_code)]
413#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
414mod no_rt {
415    use std::task::{Context, Poll};
416    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
417
418    /// Runs the provided future, blocking the current thread until the future
419    /// completes.
420    pub fn block_on<F: Future<Output = ()>>(_: F) {
421        panic!("async runtime is not configured");
422    }
423
424    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
425    where
426        F: Future + 'static,
427    {
428        unimplemented!()
429    }
430
431    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
432    where
433        F: FnOnce() -> T + Send + Sync + 'static,
434        T: Send + 'static,
435    {
436        unimplemented!()
437    }
438
439    /// Blocking operation completion future. It resolves with results
440    /// of blocking function execution.
441    #[allow(clippy::type_complexity)]
442    pub struct JoinHandle<T> {
443        t: PhantomData<T>,
444    }
445
446    impl<T> JoinHandle<T> {
447        pub fn is_finished(&self) -> bool {
448            true
449        }
450    }
451
452    impl<T> Future for JoinHandle<T> {
453        type Output = Result<T, JoinError>;
454
455        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
456            todo!()
457        }
458    }
459
460    #[derive(Debug, Copy, Clone)]
461    pub struct JoinError;
462
463    impl fmt::Display for JoinError {
464        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465            write!(f, "JoinError")
466        }
467    }
468
469    impl std::error::Error for JoinError {}
470}
471
472#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
473pub use self::no_rt::*;