ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18    before: Box<dyn Fn() -> Option<*const ()>>,
19    enter: Box<dyn Fn(*const ()) -> *const ()>,
20    exit: Box<dyn Fn(*const ())>,
21    after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25    cb: &'static Callbacks,
26    ptr: *const (),
27}
28
29impl Data {
30    fn load() -> Option<Data> {
31        let cb = CB.with(|cb| cb.get());
32
33        if let Some(cb) = unsafe { cb.as_ref() } {
34            if let Some(ptr) = (*cb.before)() {
35                return Some(Data { cb, ptr });
36            }
37        }
38        None
39    }
40
41    fn run<F, R>(&mut self, f: F) -> R
42    where
43        F: FnOnce() -> R,
44    {
45        let ptr = (*self.cb.enter)(self.ptr);
46        let result = f();
47        (*self.cb.exit)(ptr);
48        result
49    }
50}
51
52impl Drop for Data {
53    fn drop(&mut self) {
54        (*self.cb.after)(self.ptr)
55    }
56}
57
58/// # Safety
59///
60/// The user must ensure that the pointer returned by `before` has a `'static` lifetime.
61/// This pointer will be owned by the spawned task for the duration of that task, and
62/// ownership will be returned to the user at the end of the task via `after`.
63/// The pointer remains opaque to the runtime.
64///
65/// # Panics
66///
67/// Panics if spawn callbacks have already been set.
68pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69    before: FBefore,
70    enter: FEnter,
71    exit: FExit,
72    after: FAfter,
73) where
74    FBefore: Fn() -> Option<*const ()> + 'static,
75    FEnter: Fn(*const ()) -> *const () + 'static,
76    FExit: Fn(*const ()) + 'static,
77    FAfter: Fn(*const ()) + 'static,
78{
79    CB.with(|cb| {
80        let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
81            before: Box::new(before),
82            enter: Box::new(enter),
83            exit: Box::new(exit),
84            after: Box::new(after),
85        }));
86
87        if !cb.replace(new).is_null() {
88            panic!("Spawn callbacks already set");
89        }
90    });
91}
92
93#[allow(dead_code)]
94#[cfg(feature = "tokio")]
95mod tokio {
96    use std::future::{Future, poll_fn};
97    pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
98
99    /// Runs the provided future, blocking the current thread until the future
100    /// completes.
101    pub fn block_on<F: Future<Output = ()>>(fut: F) {
102        if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
103            log::debug!("Use existing tokio runtime and block on future");
104            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
105        } else {
106            log::debug!("Create tokio runtime and block on future");
107
108            let rt = tok_io::runtime::Builder::new_current_thread()
109                .enable_all()
110                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
111                .build()
112                .unwrap();
113            tok_io::task::LocalSet::new().block_on(&rt, fut);
114        }
115    }
116
117    /// Spawn a future on the current thread. This does not create a new Arbiter
118    /// or Arbiter address, it is simply a helper for spawning futures on the current
119    /// thread.
120    ///
121    /// # Panics
122    ///
123    /// This function panics if ntex system is not running.
124    #[inline]
125    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
126    where
127        F: Future + 'static,
128    {
129        let data = crate::Data::load();
130        if let Some(mut data) = data {
131            tok_io::task::spawn_local(async move {
132                tok_io::pin!(f);
133                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
134            })
135        } else {
136            tok_io::task::spawn_local(f)
137        }
138    }
139
140    #[derive(Clone, Debug)]
141    /// Handle to the runtime.
142    pub struct Handle(tok_io::runtime::Handle);
143
144    impl Handle {
145        #[inline]
146        pub fn current() -> Self {
147            Self(tok_io::runtime::Handle::current())
148        }
149
150        #[inline]
151        /// Wake up runtime
152        pub fn notify(&self) {}
153
154        #[inline]
155        /// Spawns a new asynchronous task, returning a [`Task`] for it.
156        ///
157        /// Spawning a task enables the task to execute concurrently to other tasks.
158        /// There is no guarantee that a spawned task will execute to completion.
159        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
160        where
161            F: Future + Send + 'static,
162            F::Output: Send + 'static,
163        {
164            self.0.spawn(future)
165        }
166    }
167}
168
169#[allow(dead_code)]
170#[cfg(feature = "compio")]
171mod compio {
172    use std::task::{Context, Poll, ready};
173    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
174
175    use compio_driver::DriverType;
176    use compio_runtime::Runtime;
177
178    /// Runs the provided future, blocking the current thread until the future
179    /// completes.
180    pub fn block_on<F: Future<Output = ()>>(fut: F) {
181        log::info!(
182            "Starting compio runtime, driver {:?}",
183            compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
184                .unwrap_or(DriverType::Poll)
185        );
186        let rt = Runtime::new().unwrap();
187        rt.block_on(fut);
188    }
189
190    /// Spawns a blocking task.
191    ///
192    /// The task will be spawned onto a thread pool specifically dedicated
193    /// to blocking tasks. This is useful to prevent long-running synchronous
194    /// operations from blocking the main futures executor.
195    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
196    where
197        F: FnOnce() -> T + Send + Sync + 'static,
198        T: Send + 'static,
199    {
200        JoinHandle {
201            fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
202        }
203    }
204
205    /// Spawn a future on the current thread. This does not create a new Arbiter
206    /// or Arbiter address, it is simply a helper for spawning futures on the current
207    /// thread.
208    ///
209    /// # Panics
210    ///
211    /// This function panics if ntex system is not running.
212    #[inline]
213    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
214    where
215        F: Future + 'static,
216    {
217        let fut = if let Some(mut data) = crate::Data::load() {
218            compio_runtime::spawn(async move {
219                let mut f = std::pin::pin!(f);
220                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
221            })
222        } else {
223            compio_runtime::spawn(f)
224        };
225
226        JoinHandle {
227            fut: Some(Either::Compio(fut)),
228        }
229    }
230
231    #[derive(Debug, Copy, Clone)]
232    pub struct JoinError;
233
234    impl fmt::Display for JoinError {
235        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236            write!(f, "JoinError")
237        }
238    }
239
240    impl std::error::Error for JoinError {}
241
242    enum Either<T> {
243        Compio(compio_runtime::JoinHandle<T>),
244        Spawn(oneshot::Receiver<T>),
245    }
246
247    pub struct JoinHandle<T> {
248        fut: Option<Either<T>>,
249    }
250
251    impl<T> JoinHandle<T> {
252        pub fn is_finished(&self) -> bool {
253            match &self.fut {
254                Some(Either::Compio(fut)) => fut.is_finished(),
255                Some(Either::Spawn(fut)) => fut.is_closed(),
256                None => true,
257            }
258        }
259    }
260
261    impl<T> Drop for JoinHandle<T> {
262        fn drop(&mut self) {
263            if let Some(Either::Compio(fut)) = self.fut.take() {
264                fut.detach();
265            }
266        }
267    }
268
269    impl<T> Future for JoinHandle<T> {
270        type Output = Result<T, JoinError>;
271
272        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273            Poll::Ready(match self.fut.as_mut() {
274                Some(Either::Compio(fut)) => {
275                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
276                }
277                Some(Either::Spawn(fut)) => {
278                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
279                }
280                None => Err(JoinError),
281            })
282        }
283    }
284
285    #[derive(Clone, Debug)]
286    pub struct Handle(crate::Arbiter);
287
288    impl Handle {
289        pub fn current() -> Self {
290            Self(crate::Arbiter::current())
291        }
292
293        pub fn notify(&self) {}
294
295        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
296        where
297            F: Future + Send + 'static,
298            F::Output: Send + 'static,
299        {
300            let (tx, rx) = oneshot::channel();
301            self.0.spawn(async move {
302                let result = future.await;
303                let _ = tx.send(result);
304            });
305            JoinHandle {
306                fut: Some(Either::Spawn(rx)),
307            }
308        }
309    }
310}
311
312#[allow(dead_code)]
313#[cfg(feature = "neon")]
314mod neon {
315    use std::task::{Context, Poll, ready};
316    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
317
318    use ntex_neon::Runtime;
319
320    /// Runs the provided future, blocking the current thread until the future
321    /// completes.
322    pub fn block_on<F: Future<Output = ()>>(fut: F) {
323        let rt = Runtime::new().unwrap();
324        log::info!(
325            "Starting neon runtime, driver {:?}",
326            rt.driver_type().name()
327        );
328
329        rt.block_on(fut);
330    }
331
332    /// Spawn a future on the current thread. This does not create a new Arbiter
333    /// or Arbiter address, it is simply a helper for spawning futures on the current
334    /// thread.
335    ///
336    /// # Panics
337    ///
338    /// This function panics if ntex system is not running.
339    #[inline]
340    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
341    where
342        F: Future + 'static,
343    {
344        let task = if let Some(mut data) = crate::Data::load() {
345            ntex_neon::spawn(async move {
346                let mut f = std::pin::pin!(f);
347                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
348            })
349        } else {
350            ntex_neon::spawn(f)
351        };
352
353        JoinHandle {
354            task: Some(Either::Task(task)),
355        }
356    }
357
358    /// Spawns a blocking task.
359    ///
360    /// The task will be spawned onto a thread pool specifically dedicated
361    /// to blocking tasks. This is useful to prevent long-running synchronous
362    /// operations from blocking the main futures executor.
363    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
364    where
365        F: FnOnce() -> T + Send + Sync + 'static,
366        T: Send + 'static,
367    {
368        JoinHandle {
369            task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
370        }
371    }
372
373    #[derive(Clone, Debug)]
374    pub struct Handle(ntex_neon::Handle);
375
376    impl Handle {
377        pub fn current() -> Self {
378            Self(ntex_neon::Handle::current())
379        }
380
381        pub fn notify(&self) {
382            let _ = self.0.notify();
383        }
384
385        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
386        where
387            F: Future + Send + 'static,
388            F::Output: Send + 'static,
389        {
390            JoinHandle {
391                task: Some(Either::Task(self.0.spawn(future))),
392            }
393        }
394    }
395
396    enum Either<T> {
397        Task(ntex_neon::Task<T>),
398        Blocking(ntex_neon::JoinHandle<T>),
399    }
400
401    /// A spawned task.
402    pub struct JoinHandle<T> {
403        task: Option<Either<T>>,
404    }
405
406    impl<T> JoinHandle<T> {
407        pub fn is_finished(&self) -> bool {
408            match &self.task {
409                Some(Either::Task(fut)) => fut.is_finished(),
410                Some(Either::Blocking(fut)) => fut.is_closed(),
411                None => true,
412            }
413        }
414    }
415
416    impl<T> Drop for JoinHandle<T> {
417        fn drop(&mut self) {
418            if let Some(Either::Task(fut)) = self.task.take() {
419                fut.detach();
420            }
421        }
422    }
423
424    impl<T> Future for JoinHandle<T> {
425        type Output = Result<T, JoinError>;
426
427        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428            Poll::Ready(match self.task.as_mut() {
429                Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
430                Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
431                    .map_err(|_| JoinError)
432                    .and_then(|res| res.map_err(|_| JoinError)),
433                None => Err(JoinError),
434            })
435        }
436    }
437
438    #[derive(Debug, Copy, Clone)]
439    pub struct JoinError;
440
441    impl fmt::Display for JoinError {
442        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
443            write!(f, "JoinError")
444        }
445    }
446
447    impl std::error::Error for JoinError {}
448}
449
450#[cfg(feature = "tokio")]
451pub use self::tokio::*;
452
453#[cfg(feature = "compio")]
454pub use self::compio::*;
455
456#[cfg(feature = "neon")]
457pub use self::neon::*;
458
459#[allow(dead_code)]
460#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
461mod no_rt {
462    use std::task::{Context, Poll};
463    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
464
465    /// Runs the provided future, blocking the current thread until the future
466    /// completes.
467    pub fn block_on<F: Future<Output = ()>>(_: F) {
468        panic!("async runtime is not configured");
469    }
470
471    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
472    where
473        F: Future + 'static,
474    {
475        unimplemented!()
476    }
477
478    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
479    where
480        F: FnOnce() -> T + Send + Sync + 'static,
481        T: Send + 'static,
482    {
483        unimplemented!()
484    }
485
486    /// Blocking operation completion future. It resolves with results
487    /// of blocking function execution.
488    #[allow(clippy::type_complexity)]
489    pub struct JoinHandle<T> {
490        t: PhantomData<T>,
491    }
492
493    impl<T> JoinHandle<T> {
494        pub fn is_finished(&self) -> bool {
495            true
496        }
497    }
498
499    impl<T> Future for JoinHandle<T> {
500        type Output = Result<T, JoinError>;
501
502        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
503            todo!()
504        }
505    }
506
507    #[derive(Debug, Copy, Clone)]
508    pub struct JoinError;
509
510    impl fmt::Display for JoinError {
511        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
512            write!(f, "JoinError")
513        }
514    }
515
516    impl std::error::Error for JoinError {}
517
518    #[derive(Clone, Debug)]
519    /// Handle to the runtime.
520    pub struct Handle;
521
522    impl Handle {
523        #[inline]
524        pub fn current() -> Self {
525            Self
526        }
527
528        #[inline]
529        /// Wake up runtime
530        pub fn notify(&self) {}
531
532        #[inline]
533        /// Spawns a new asynchronous task, returning a [`Task`] for it.
534        ///
535        /// Spawning a task enables the task to execute concurrently to other tasks.
536        /// There is no guarantee that a spawned task will execute to completion.
537        pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
538        where
539            F: Future + Send + 'static,
540            F::Output: Send + 'static,
541        {
542            panic!("async runtime is not configured");
543        }
544    }
545}
546
547#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
548pub use self::no_rt::*;