ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15        Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16    );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24/// # Safety
25///
26/// The user must ensure that the pointer returned by `before` is `'static`. It will become
27/// owned by the spawned task for the life of the task. Ownership of the pointer will be
28/// returned to the user at the end of the task via `after`. The pointer is opaque to the
29/// runtime.
30pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31    before: FBefore,
32    enter: FEnter,
33    exit: FExit,
34    after: FAfter,
35) where
36    FBefore: Fn() -> Option<*const ()> + 'static,
37    FEnter: Fn(*const ()) -> *const () + 'static,
38    FExit: Fn(*const ()) + 'static,
39    FAfter: Fn(*const ()) + 'static,
40{
41    CB.with(|cb| {
42        *cb.borrow_mut() = (
43            Box::new(before),
44            Box::new(enter),
45            Box::new(exit),
46            Box::new(after),
47        );
48    });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54    use std::future::{poll_fn, Future};
55    use tok_io::runtime::Handle;
56    pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58    /// Runs the provided future, blocking the current thread until the future
59    /// completes.
60    pub fn block_on<F: Future<Output = ()>>(fut: F) {
61        if let Ok(hnd) = Handle::try_current() {
62            log::debug!("Use existing tokio runtime and block on future");
63            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64        } else {
65            log::debug!("Create tokio runtime and block on future");
66
67            let rt = tok_io::runtime::Builder::new_current_thread()
68                .enable_all()
69                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
70                .build()
71                .unwrap();
72            tok_io::task::LocalSet::new().block_on(&rt, fut);
73        }
74    }
75
76    /// Spawn a future on the current thread. This does not create a new Arbiter
77    /// or Arbiter address, it is simply a helper for spawning futures on the current
78    /// thread.
79    ///
80    /// # Panics
81    ///
82    /// This function panics if ntex system is not running.
83    #[inline]
84    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85    where
86        F: Future + 'static,
87    {
88        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89        tok_io::task::spawn_local(async move {
90            if let Some(ptr) = ptr {
91                tok_io::pin!(f);
92                let result = poll_fn(|ctx| {
93                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94                    let result = f.as_mut().poll(ctx);
95                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96                    result
97                })
98                .await;
99                crate::CB.with(|cb| (cb.borrow().3)(ptr));
100                result
101            } else {
102                f.await
103            }
104        })
105    }
106
107    /// Executes a future on the current thread. This does not create a new Arbiter
108    /// or Arbiter address, it is simply a helper for executing futures on the current
109    /// thread.
110    ///
111    /// # Panics
112    ///
113    /// This function panics if ntex system is not running.
114    #[inline]
115    pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116    where
117        F: FnOnce() -> R + 'static,
118        R: Future + 'static,
119    {
120        spawn(async move { f().await })
121    }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127    use std::task::{ready, Context, Poll};
128    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130    use compio_runtime::Runtime;
131
132    /// Runs the provided future, blocking the current thread until the future
133    /// completes.
134    pub fn block_on<F: Future<Output = ()>>(fut: F) {
135        log::info!(
136            "Starting compio runtime, driver {:?}",
137            compio_driver::DriverType::current()
138        );
139        let rt = Runtime::new().unwrap();
140        rt.block_on(fut);
141    }
142
143    /// Spawns a blocking task.
144    ///
145    /// The task will be spawned onto a thread pool specifically dedicated
146    /// to blocking tasks. This is useful to prevent long-running synchronous
147    /// operations from blocking the main futures executor.
148    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149    where
150        F: FnOnce() -> T + Send + Sync + 'static,
151        T: Send + 'static,
152    {
153        JoinHandle {
154            fut: Some(compio_runtime::spawn_blocking(f)),
155        }
156    }
157
158    /// Spawn a future on the current thread. This does not create a new Arbiter
159    /// or Arbiter address, it is simply a helper for spawning futures on the current
160    /// thread.
161    ///
162    /// # Panics
163    ///
164    /// This function panics if ntex system is not running.
165    #[inline]
166    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167    where
168        F: Future + 'static,
169    {
170        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171        let fut = compio_runtime::spawn(async move {
172            if let Some(ptr) = ptr {
173                let mut f = std::pin::pin!(f);
174                let result = poll_fn(|ctx| {
175                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176                    let result = f.as_mut().poll(ctx);
177                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178                    result
179                })
180                .await;
181                crate::CB.with(|cb| (cb.borrow().3)(ptr));
182                result
183            } else {
184                f.await
185            }
186        });
187
188        JoinHandle { fut: Some(fut) }
189    }
190
191    /// Executes a future on the current thread. This does not create a new Arbiter
192    /// or Arbiter address, it is simply a helper for executing futures on the current
193    /// thread.
194    ///
195    /// # Panics
196    ///
197    /// This function panics if ntex system is not running.
198    #[inline]
199    pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200    where
201        F: FnOnce() -> R + 'static,
202        R: Future + 'static,
203    {
204        spawn(async move { f().await })
205    }
206
207    #[derive(Debug, Copy, Clone)]
208    pub struct JoinError;
209
210    impl fmt::Display for JoinError {
211        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212            write!(f, "JoinError")
213        }
214    }
215
216    impl std::error::Error for JoinError {}
217
218    pub struct JoinHandle<T> {
219        fut: Option<compio_runtime::JoinHandle<T>>,
220    }
221
222    impl<T> JoinHandle<T> {
223        pub fn is_finished(&self) -> bool {
224            if let Some(hnd) = &self.fut {
225                hnd.is_finished()
226            } else {
227                true
228            }
229        }
230    }
231
232    impl<T> Drop for JoinHandle<T> {
233        fn drop(&mut self) {
234            self.fut.take().unwrap().detach();
235        }
236    }
237
238    impl<T> Future for JoinHandle<T> {
239        type Output = Result<T, JoinError>;
240
241        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242            Poll::Ready(
243                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244                    .map_err(|_| JoinError),
245            )
246        }
247    }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "neon")]
252mod neon {
253    use std::task::{ready, Context, Poll};
254    use std::{fmt, future::poll_fn, future::Future, pin::Pin};
255
256    use ntex_neon::Runtime;
257
258    /// Runs the provided future, blocking the current thread until the future
259    /// completes.
260    pub fn block_on<F: Future<Output = ()>>(fut: F) {
261        let rt = Runtime::new().unwrap();
262        log::info!(
263            "Starting neon runtime, driver {:?}",
264            rt.driver().tp().name()
265        );
266
267        rt.block_on(fut);
268    }
269
270    /// Spawns a blocking task.
271    ///
272    /// The task will be spawned onto a thread pool specifically dedicated
273    /// to blocking tasks. This is useful to prevent long-running synchronous
274    /// operations from blocking the main futures executor.
275    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
276    where
277        F: FnOnce() -> T + Send + Sync + 'static,
278        T: Send + 'static,
279    {
280        JoinHandle {
281            fut: Some(ntex_neon::spawn_blocking(f)),
282        }
283    }
284
285    /// Spawn a future on the current thread. This does not create a new Arbiter
286    /// or Arbiter address, it is simply a helper for spawning futures on the current
287    /// thread.
288    ///
289    /// # Panics
290    ///
291    /// This function panics if ntex system is not running.
292    #[inline]
293    pub fn spawn<F>(f: F) -> Task<F::Output>
294    where
295        F: Future + 'static,
296    {
297        let ptr = crate::CB.with(|cb| (cb.borrow().0)());
298        let task = ntex_neon::spawn(async move {
299            if let Some(ptr) = ptr {
300                let mut f = std::pin::pin!(f);
301                let result = poll_fn(|ctx| {
302                    let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
303                    let result = f.as_mut().poll(ctx);
304                    crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
305                    result
306                })
307                .await;
308                crate::CB.with(|cb| (cb.borrow().3)(ptr));
309                result
310            } else {
311                f.await
312            }
313        });
314
315        Task { task: Some(task) }
316    }
317
318    /// Executes a future on the current thread. This does not create a new Arbiter
319    /// or Arbiter address, it is simply a helper for executing futures on the current
320    /// thread.
321    ///
322    /// # Panics
323    ///
324    /// This function panics if ntex system is not running.
325    #[inline]
326    pub fn spawn_fn<F, R>(f: F) -> Task<R::Output>
327    where
328        F: FnOnce() -> R + 'static,
329        R: Future + 'static,
330    {
331        spawn(async move { f().await })
332    }
333
334    /// A spawned task.
335    pub struct Task<T> {
336        task: Option<ntex_neon::Task<T>>,
337    }
338
339    impl<T> Task<T> {
340        pub fn is_finished(&self) -> bool {
341            if let Some(hnd) = &self.task {
342                hnd.is_finished()
343            } else {
344                true
345            }
346        }
347    }
348
349    impl<T> Drop for Task<T> {
350        fn drop(&mut self) {
351            self.task.take().unwrap().detach();
352        }
353    }
354
355    impl<T> Future for Task<T> {
356        type Output = Result<T, JoinError>;
357
358        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
359            Poll::Ready(Ok(ready!(Pin::new(self.task.as_mut().unwrap()).poll(cx))))
360        }
361    }
362
363    #[derive(Debug, Copy, Clone)]
364    pub struct JoinError;
365
366    impl fmt::Display for JoinError {
367        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
368            write!(f, "JoinError")
369        }
370    }
371
372    impl std::error::Error for JoinError {}
373
374    pub struct JoinHandle<T> {
375        fut: Option<ntex_neon::JoinHandle<T>>,
376    }
377
378    impl<T> JoinHandle<T> {
379        pub fn is_finished(&self) -> bool {
380            false
381        }
382    }
383
384    impl<T> Future for JoinHandle<T> {
385        type Output = Result<T, JoinError>;
386
387        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388            Poll::Ready(
389                ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
390                    .map_err(|_| JoinError)
391                    .and_then(|result| result.map_err(|_| JoinError)),
392            )
393        }
394    }
395}
396
397#[cfg(feature = "tokio")]
398pub use self::tokio::*;
399
400#[cfg(feature = "compio")]
401pub use self::compio::*;
402
403#[cfg(feature = "neon")]
404pub use self::neon::*;
405
406#[allow(dead_code)]
407#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
408mod no_rt {
409    use std::task::{Context, Poll};
410    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
411
412    /// Runs the provided future, blocking the current thread until the future
413    /// completes.
414    pub fn block_on<F: Future<Output = ()>>(_: F) {
415        panic!("async runtime is not configured");
416    }
417
418    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
419    where
420        F: Future + 'static,
421    {
422        unimplemented!()
423    }
424
425    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
426    where
427        F: FnOnce() -> T + Send + Sync + 'static,
428        T: Send + 'static,
429    {
430        unimplemented!()
431    }
432
433    /// Blocking operation completion future. It resolves with results
434    /// of blocking function execution.
435    #[allow(clippy::type_complexity)]
436    pub struct JoinHandle<T> {
437        t: PhantomData<T>,
438    }
439
440    impl<T> JoinHandle<T> {
441        pub fn is_finished(&self) -> bool {
442            true
443        }
444    }
445
446    impl<T> Future for JoinHandle<T> {
447        type Output = Result<T, JoinError>;
448
449        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
450            todo!()
451        }
452    }
453
454    #[derive(Debug, Copy, Clone)]
455    pub struct JoinError;
456
457    impl fmt::Display for JoinError {
458        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459            write!(f, "JoinError")
460        }
461    }
462
463    impl std::error::Error for JoinError {}
464}
465
466#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
467pub use self::no_rt::*;