ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    use tok_io::runtime::Handle;
56    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58    /// Runs the provided future, blocking the current thread until the future
59    /// completes.
60    pub fn block_on<F: Future<Output = ()>>(fut: F) {
61        if let Ok(hnd) = Handle::try_current() {
62            log::debug!("Use existing tokio runtime and block on future");
63            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64        } else {
65            log::debug!("Create tokio runtime and block on future");
66
67            let rt = tok_io::runtime::Builder::new_current_thread()
68                .enable_all()
69                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
70                .build()
71                .unwrap();
72            tok_io::task::LocalSet::new().block_on(&rt, fut);
73        }
74    }
75
76    /// Spawn a future on the current thread. This does not create a new Arbiter
77    /// or Arbiter address, it is simply a helper for spawning futures on the current
78    /// thread.
79    ///
80    /// # Panics
81    ///
82    /// This function panics if ntex system is not running.
83    #[inline]
84    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85    where
86        F: Future + 'static,
87    {
88        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89        tok_io::task::spawn_local(async move {
90            if let Some(ptr) = ptr {
91                tok_io::pin!(f);
92                let result = poll_fn(|ctx| {
93                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94                    let result = f.as_mut().poll(ctx);
95                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96                    result
97                })
98                .await;
99                crate::CB.with(|cb| (cb.borrow().3)(ptr));
100                result
101            } else {
102                f.await
103            }
104        })
105    }
106
107    /// Executes a future on the current thread. This does not create a new Arbiter
108    /// or Arbiter address, it is simply a helper for executing futures on the current
109    /// thread.
110    ///
111    /// # Panics
112    ///
113    /// This function panics if ntex system is not running.
114    #[inline]
115    pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116    where
117        F: FnOnce() -> R + 'static,
118        R: Future + 'static,
119    {
120        spawn(async move { f().await })
121    }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127    use std::task::{ready, Context, Poll};
128    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130    use compio_runtime::Runtime;
131
132    /// Runs the provided future, blocking the current thread until the future
133    /// completes.
134    pub fn block_on<F: Future<Output = ()>>(fut: F) {
135        log::info!(
136            "Starting compio runtime, driver {:?}",
137            compio_driver::DriverType::current()
138        );
139        let rt = Runtime::new().unwrap();
140        rt.block_on(fut);
141    }
142
143    /// Spawns a blocking task.
144    ///
145    /// The task will be spawned onto a thread pool specifically dedicated
146    /// to blocking tasks. This is useful to prevent long-running synchronous
147    /// operations from blocking the main futures executor.
148    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149    where
150        F: FnOnce() -> T + Send + Sync + 'static,
151        T: Send + 'static,
152    {
153        JoinHandle {
154            fut: Some(compio_runtime::spawn_blocking(f)),
155        }
156    }
157
158    /// Spawn a future on the current thread. This does not create a new Arbiter
159    /// or Arbiter address, it is simply a helper for spawning futures on the current
160    /// thread.
161    ///
162    /// # Panics
163    ///
164    /// This function panics if ntex system is not running.
165    #[inline]
166    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167    where
168        F: Future + 'static,
169    {
170        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171        let fut = compio_runtime::spawn(async move {
172            if let Some(ptr) = ptr {
173                let mut f = std::pin::pin!(f);
174                let result = poll_fn(|ctx| {
175                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176                    let result = f.as_mut().poll(ctx);
177                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178                    result
179                })
180                .await;
181                crate::CB.with(|cb| (cb.borrow().3)(ptr));
182                result
183            } else {
184                f.await
185            }
186        });
187
188        JoinHandle { fut: Some(fut) }
189    }
190
191    /// Executes a future on the current thread. This does not create a new Arbiter
192    /// or Arbiter address, it is simply a helper for executing futures on the current
193    /// thread.
194    ///
195    /// # Panics
196    ///
197    /// This function panics if ntex system is not running.
198    #[inline]
199    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200    where
201        F: FnOnce() -> R + 'static,
202        R: Future + 'static,
203    {
204        spawn(async move { f().await })
205    }
206
207    #[derive(Debug, Copy, Clone)]
208    pub struct JoinError;
209
210    impl fmt::Display for JoinError {
211        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212            write!(f, "JoinError")
213        }
214    }
215
216    impl std::error::Error for JoinError {}
217
218    pub struct JoinHandle<T> {
219        fut: Option<compio_runtime::JoinHandle<T>>,
220    }
221
222    impl<T> JoinHandle<T> {
223        pub fn is_finished(&self) -> bool {
224            if let Some(hnd) = &self.fut {
225                hnd.is_finished()
226            } else {
227                true
228            }
229        }
230    }
231
232    impl<T> Drop for JoinHandle<T> {
233        fn drop(&mut self) {
234            self.fut.take().unwrap().detach();
235        }
236    }
237
238    impl<T> Future for JoinHandle<T> {
239        type Output = Result<T, JoinError>;
240
241        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242            Poll::Ready(
243                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244                    .map_err(|_| JoinError),
245            )
246        }
247    }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "neon")]
252mod neon {
253    use std::task::{ready, Context, Poll};
254    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
255
256    use ntex_neon::Runtime;
257
258    /// Runs the provided future, blocking the current thread until the future
259    /// completes.
260    pub fn block_on<F: Future<Output = ()>>(fut: F) {
261        log::info!(
262            "Starting neon runtime, driver {:?}",
263            ntex_neon::driver::DriverType::current()
264        );
265        let rt = Runtime::new().unwrap();
266        rt.block_on(fut);
267    }
268
269    /// Spawns a blocking task.
270    ///
271    /// The task will be spawned onto a thread pool specifically dedicated
272    /// to blocking tasks. This is useful to prevent long-running synchronous
273    /// operations from blocking the main futures executor.
274    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
275    where
276        F: FnOnce() -> T + Send + Sync + 'static,
277        T: Send + 'static,
278    {
279        JoinHandle {
280            fut: Some(ntex_neon::spawn_blocking(f)),
281        }
282    }
283
284    /// Spawn a future on the current thread. This does not create a new Arbiter
285    /// or Arbiter address, it is simply a helper for spawning futures on the current
286    /// thread.
287    ///
288    /// # Panics
289    ///
290    /// This function panics if ntex system is not running.
291    #[inline]
292    pub fn spawn<F>(f: F) -> Task<F::Output>
293    where
294        F: Future + 'static,
295    {
296        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
297        let task = ntex_neon::spawn(async move {
298            if let Some(ptr) = ptr {
299                let mut f = std::pin::pin!(f);
300                let result = poll_fn(|ctx| {
301                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
302                    let result = f.as_mut().poll(ctx);
303                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
304                    result
305                })
306                .await;
307                crate::CB.with(|cb| (cb.borrow().3)(ptr));
308                result
309            } else {
310                f.await
311            }
312        });
313
314        Task { task: Some(task) }
315    }
316
317    /// Executes a future on the current thread. This does not create a new Arbiter
318    /// or Arbiter address, it is simply a helper for executing futures on the current
319    /// thread.
320    ///
321    /// # Panics
322    ///
323    /// This function panics if ntex system is not running.
324    #[inline]
325    pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
326    where
327        F: FnOnce() -> R + 'static,
328        R: Future + 'static,
329    {
330        spawn(async move { f().await })
331    }
332
333    /// A spawned task.
334    pub struct Task<T> {
335        task: Option<ntex_neon::Task<T>>,
336    }
337
338    impl<T> Task<T> {
339        pub fn is_finished(&self) -> bool {
340            if let Some(hnd) = &self.task {
341                hnd.is_finished()
342            } else {
343                true
344            }
345        }
346    }
347
348    impl<T> Drop for Task<T> {
349        fn drop(&mut self) {
350            self.task.take().unwrap().detach();
351        }
352    }
353
354    impl<T> Future for Task<T> {
355        type Output = Result<T, JoinError>;
356
357        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
358            Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
359        }
360    }
361
362    #[derive(Debug, Copy, Clone)]
363    pub struct JoinError;
364
365    impl fmt::Display for JoinError {
366        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367            write!(f, "JoinError")
368        }
369    }
370
371    impl std::error::Error for JoinError {}
372
373    pub struct JoinHandle<T> {
374        fut: Option<ntex_neon::JoinHandle<T>>,
375    }
376
377    impl<T> JoinHandle<T> {
378        pub fn is_finished(&self) -> bool {
379            if let Some(hnd) = &self.fut {
380                hnd.is_finished()
381            } else {
382                true
383            }
384        }
385    }
386
387    impl<T> Drop for JoinHandle<T> {
388        fn drop(&mut self) {
389            self.fut.take().unwrap().detach();
390        }
391    }
392
393    impl<T> Future for JoinHandle<T> {
394        type Output = Result<T, JoinError>;
395
396        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
397            Poll::Ready(
398                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
399                    .map_err(|_| JoinError),
400            )
401        }
402    }
403}
404
405#[cfg(feature = "tokio")]
406pub use self::tokio::*;
407
408#[cfg(feature = "compio")]
409pub use self::compio::*;
410
411#[cfg(feature = "neon")]
412pub use self::neon::*;
413
414#[allow(dead_code)]
415#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
416mod no_rt {
417    use std::task::{Context, Poll};
418    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
419
420    /// Runs the provided future, blocking the current thread until the future
421    /// completes.
422    pub fn block_on<F: Future<Output = ()>>(_: F) {
423        panic!("async runtime is not configured");
424    }
425
426    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
427    where
428        F: Future + 'static,
429    {
430        unimplemented!()
431    }
432
433    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
434    where
435        F: FnOnce() -> T + Send + Sync + 'static,
436        T: Send + 'static,
437    {
438        unimplemented!()
439    }
440
441    /// Blocking operation completion future. It resolves with results
442    /// of blocking function execution.
443    #[allow(clippy::type_complexity)]
444    pub struct JoinHandle<T> {
445        t: PhantomData<T>,
446    }
447
448    impl<T> JoinHandle<T> {
449        pub fn is_finished(&self) -> bool {
450            true
451        }
452    }
453
454    impl<T> Future for JoinHandle<T> {
455        type Output = Result<T, JoinError>;
456
457        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
458            todo!()
459        }
460    }
461
462    #[derive(Debug, Copy, Clone)]
463    pub struct JoinError;
464
465    impl fmt::Display for JoinError {
466        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467            write!(f, "JoinError")
468        }
469    }
470
471    impl std::error::Error for JoinError {}
472}
473
474#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
475pub use self::no_rt::*;