ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18    before: Box<dyn Fn() -> Option<*const ()>>,
19    enter: Box<dyn Fn(*const ()) -> *const ()>,
20    exit: Box<dyn Fn(*const ())>,
21    after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25    cb: &'static Callbacks,
26    ptr: *const (),
27}
28
29impl Data {
30    fn load() -> Option<Data> {
31        let cb = CB.with(|cb| cb.get());
32
33        if let Some(cb) = unsafe { cb.as_ref() } {
34            if let Some(ptr) = (*cb.before)() {
35                return Some(Data { cb, ptr });
36            }
37        }
38        None
39    }
40
41    fn run<F, R>(&mut self, f: F) -> R
42    where
43        F: FnOnce() -> R,
44    {
45        let ptr = (*self.cb.enter)(self.ptr);
46        let result = f();
47        (*self.cb.exit)(ptr);
48        result
49    }
50}
51
52impl Drop for Data {
53    fn drop(&mut self) {
54        (*self.cb.after)(self.ptr)
55    }
56}
57
58/// # Safety
59///
60/// The user must ensure that the pointer returned by `before` has a `'static` lifetime.
61/// This pointer will be owned by the spawned task for the duration of that task, and
62/// ownership will be returned to the user at the end of the task via `after`.
63/// The pointer remains opaque to the runtime.
64///
65/// # Panics
66///
67/// Panics if spawn callbacks have already been set.
68pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69    before: FBefore,
70    enter: FEnter,
71    exit: FExit,
72    after: FAfter,
73) where
74    FBefore: Fn() -> Option<*const ()> + 'static,
75    FEnter: Fn(*const ()) -> *const () + 'static,
76    FExit: Fn(*const ()) + 'static,
77    FAfter: Fn(*const ()) + 'static,
78{
79    CB.with(|cb| {
80        if !cb.get().is_null() {
81            panic!("Spawn callbacks already set");
82        }
83
84        let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
85            before: Box::new(before),
86            enter: Box::new(enter),
87            exit: Box::new(exit),
88            after: Box::new(after),
89        }));
90        cb.replace(new);
91    });
92}
93
94/// # Safety
95///
96/// The user must ensure that the pointer returned by `before` has a `'static` lifetime.
97/// This pointer will be owned by the spawned task for the duration of that task, and
98/// ownership will be returned to the user at the end of the task via `after`.
99/// The pointer remains opaque to the runtime.
100///
101/// Returns false if spawn callbacks have already been set.
102pub unsafe fn spawn_cbs_try<FBefore, FEnter, FExit, FAfter>(
103    before: FBefore,
104    enter: FEnter,
105    exit: FExit,
106    after: FAfter,
107) -> bool
108where
109    FBefore: Fn() -> Option<*const ()> + 'static,
110    FEnter: Fn(*const ()) -> *const () + 'static,
111    FExit: Fn(*const ()) + 'static,
112    FAfter: Fn(*const ()) + 'static,
113{
114    CB.with(|cb| {
115        if !cb.get().is_null() {
116            false
117        } else {
118            unsafe {
119                spawn_cbs(before, enter, exit, after);
120            }
121            true
122        }
123    })
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "tokio")]
128mod tokio {
129    use std::future::{Future, poll_fn};
130    pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
131
132    /// Runs the provided future, blocking the current thread until the future
133    /// completes.
134    pub fn block_on<F: Future<Output = ()>>(fut: F) {
135        if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
136            log::debug!("Use existing tokio runtime and block on future");
137            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
138        } else {
139            log::debug!("Create tokio runtime and block on future");
140
141            let rt = tok_io::runtime::Builder::new_current_thread()
142                .enable_all()
143                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
144                .build()
145                .unwrap();
146            tok_io::task::LocalSet::new().block_on(&rt, fut);
147        }
148    }
149
150    /// Spawn a future on the current thread. This does not create a new Arbiter
151    /// or Arbiter address, it is simply a helper for spawning futures on the current
152    /// thread.
153    ///
154    /// # Panics
155    ///
156    /// This function panics if ntex system is not running.
157    #[inline]
158    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
159    where
160        F: Future + 'static,
161    {
162        let data = crate::Data::load();
163        if let Some(mut data) = data {
164            tok_io::task::spawn_local(async move {
165                tok_io::pin!(f);
166                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
167            })
168        } else {
169            tok_io::task::spawn_local(f)
170        }
171    }
172
173    #[derive(Clone, Debug)]
174    /// Handle to the runtime.
175    pub struct Handle(tok_io::runtime::Handle);
176
177    impl Handle {
178        #[inline]
179        pub fn current() -> Self {
180            Self(tok_io::runtime::Handle::current())
181        }
182
183        #[inline]
184        /// Wake up runtime
185        pub fn notify(&self) {}
186
187        #[inline]
188        /// Spawns a new asynchronous task, returning a [`Task`] for it.
189        ///
190        /// Spawning a task enables the task to execute concurrently to other tasks.
191        /// There is no guarantee that a spawned task will execute to completion.
192        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
193        where
194            F: Future + Send + 'static,
195            F::Output: Send + 'static,
196        {
197            self.0.spawn(future)
198        }
199    }
200}
201
202#[allow(dead_code)]
203#[cfg(feature = "compio")]
204mod compio {
205    use std::task::{Context, Poll, ready};
206    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
207
208    use compio_driver::DriverType;
209    use compio_runtime::Runtime;
210
211    /// Runs the provided future, blocking the current thread until the future
212    /// completes.
213    pub fn block_on<F: Future<Output = ()>>(fut: F) {
214        log::info!(
215            "Starting compio runtime, driver {:?}",
216            compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
217                .unwrap_or(DriverType::Poll)
218        );
219        let rt = Runtime::new().unwrap();
220        rt.block_on(fut);
221    }
222
223    /// Spawns a blocking task.
224    ///
225    /// The task will be spawned onto a thread pool specifically dedicated
226    /// to blocking tasks. This is useful to prevent long-running synchronous
227    /// operations from blocking the main futures executor.
228    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
229    where
230        F: FnOnce() -> T + Send + Sync + 'static,
231        T: Send + 'static,
232    {
233        JoinHandle {
234            fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
235        }
236    }
237
238    /// Spawn a future on the current thread. This does not create a new Arbiter
239    /// or Arbiter address, it is simply a helper for spawning futures on the current
240    /// thread.
241    ///
242    /// # Panics
243    ///
244    /// This function panics if ntex system is not running.
245    #[inline]
246    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
247    where
248        F: Future + 'static,
249    {
250        let fut = if let Some(mut data) = crate::Data::load() {
251            compio_runtime::spawn(async move {
252                let mut f = std::pin::pin!(f);
253                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
254            })
255        } else {
256            compio_runtime::spawn(f)
257        };
258
259        JoinHandle {
260            fut: Some(Either::Compio(fut)),
261        }
262    }
263
264    #[derive(Debug, Copy, Clone)]
265    pub struct JoinError;
266
267    impl fmt::Display for JoinError {
268        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269            write!(f, "JoinError")
270        }
271    }
272
273    impl std::error::Error for JoinError {}
274
275    enum Either<T> {
276        Compio(compio_runtime::JoinHandle<T>),
277        Spawn(oneshot::Receiver<T>),
278    }
279
280    pub struct JoinHandle<T> {
281        fut: Option<Either<T>>,
282    }
283
284    impl<T> JoinHandle<T> {
285        pub fn is_finished(&self) -> bool {
286            match &self.fut {
287                Some(Either::Compio(fut)) => fut.is_finished(),
288                Some(Either::Spawn(fut)) => fut.is_closed(),
289                None => true,
290            }
291        }
292    }
293
294    impl<T> Drop for JoinHandle<T> {
295        fn drop(&mut self) {
296            if let Some(Either::Compio(fut)) = self.fut.take() {
297                fut.detach();
298            }
299        }
300    }
301
302    impl<T> Future for JoinHandle<T> {
303        type Output = Result<T, JoinError>;
304
305        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
306            Poll::Ready(match self.fut.as_mut() {
307                Some(Either::Compio(fut)) => {
308                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
309                }
310                Some(Either::Spawn(fut)) => {
311                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
312                }
313                None => Err(JoinError),
314            })
315        }
316    }
317
318    #[derive(Clone, Debug)]
319    pub struct Handle(crate::Arbiter);
320
321    impl Handle {
322        pub fn current() -> Self {
323            Self(crate::Arbiter::current())
324        }
325
326        pub fn notify(&self) {}
327
328        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
329        where
330            F: Future + Send + 'static,
331            F::Output: Send + 'static,
332        {
333            let (tx, rx) = oneshot::channel();
334            self.0.spawn(async move {
335                let result = future.await;
336                let _ = tx.send(result);
337            });
338            JoinHandle {
339                fut: Some(Either::Spawn(rx)),
340            }
341        }
342    }
343}
344
345#[allow(dead_code)]
346#[cfg(feature = "neon")]
347mod neon {
348    use std::task::{Context, Poll, ready};
349    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
350
351    use ntex_neon::Runtime;
352
353    /// Runs the provided future, blocking the current thread until the future
354    /// completes.
355    pub fn block_on<F: Future<Output = ()>>(fut: F) {
356        let rt = Runtime::new().unwrap();
357        log::info!(
358            "Starting neon runtime, driver {:?}",
359            rt.driver_type().name()
360        );
361
362        rt.block_on(fut);
363    }
364
365    /// Spawn a future on the current thread. This does not create a new Arbiter
366    /// or Arbiter address, it is simply a helper for spawning futures on the current
367    /// thread.
368    ///
369    /// # Panics
370    ///
371    /// This function panics if ntex system is not running.
372    #[inline]
373    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
374    where
375        F: Future + 'static,
376    {
377        let task = if let Some(mut data) = crate::Data::load() {
378            ntex_neon::spawn(async move {
379                let mut f = std::pin::pin!(f);
380                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
381            })
382        } else {
383            ntex_neon::spawn(f)
384        };
385
386        JoinHandle {
387            task: Some(Either::Task(task)),
388        }
389    }
390
391    /// Spawns a blocking task.
392    ///
393    /// The task will be spawned onto a thread pool specifically dedicated
394    /// to blocking tasks. This is useful to prevent long-running synchronous
395    /// operations from blocking the main futures executor.
396    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
397    where
398        F: FnOnce() -> T + Send + Sync + 'static,
399        T: Send + 'static,
400    {
401        JoinHandle {
402            task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
403        }
404    }
405
406    #[derive(Clone, Debug)]
407    pub struct Handle(ntex_neon::Handle);
408
409    impl Handle {
410        pub fn current() -> Self {
411            Self(ntex_neon::Handle::current())
412        }
413
414        pub fn notify(&self) {
415            let _ = self.0.notify();
416        }
417
418        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
419        where
420            F: Future + Send + 'static,
421            F::Output: Send + 'static,
422        {
423            JoinHandle {
424                task: Some(Either::Task(self.0.spawn(future))),
425            }
426        }
427    }
428
429    enum Either<T> {
430        Task(ntex_neon::Task<T>),
431        Blocking(ntex_neon::JoinHandle<T>),
432    }
433
434    /// A spawned task.
435    pub struct JoinHandle<T> {
436        task: Option<Either<T>>,
437    }
438
439    impl<T> JoinHandle<T> {
440        pub fn is_finished(&self) -> bool {
441            match &self.task {
442                Some(Either::Task(fut)) => fut.is_finished(),
443                Some(Either::Blocking(fut)) => fut.is_closed(),
444                None => true,
445            }
446        }
447    }
448
449    impl<T> Drop for JoinHandle<T> {
450        fn drop(&mut self) {
451            if let Some(Either::Task(fut)) = self.task.take() {
452                fut.detach();
453            }
454        }
455    }
456
457    impl<T> Future for JoinHandle<T> {
458        type Output = Result<T, JoinError>;
459
460        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461            Poll::Ready(match self.task.as_mut() {
462                Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
463                Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
464                    .map_err(|_| JoinError)
465                    .and_then(|res| res.map_err(|_| JoinError)),
466                None => Err(JoinError),
467            })
468        }
469    }
470
471    #[derive(Debug, Copy, Clone)]
472    pub struct JoinError;
473
474    impl fmt::Display for JoinError {
475        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476            write!(f, "JoinError")
477        }
478    }
479
480    impl std::error::Error for JoinError {}
481}
482
483#[cfg(feature = "tokio")]
484pub use self::tokio::*;
485
486#[cfg(feature = "compio")]
487pub use self::compio::*;
488
489#[cfg(feature = "neon")]
490pub use self::neon::*;
491
492#[allow(dead_code)]
493#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
494mod no_rt {
495    use std::task::{Context, Poll};
496    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
497
498    /// Runs the provided future, blocking the current thread until the future
499    /// completes.
500    pub fn block_on<F: Future<Output = ()>>(_: F) {
501        panic!("async runtime is not configured");
502    }
503
504    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
505    where
506        F: Future + 'static,
507    {
508        unimplemented!()
509    }
510
511    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
512    where
513        F: FnOnce() -> T + Send + Sync + 'static,
514        T: Send + 'static,
515    {
516        unimplemented!()
517    }
518
519    /// Blocking operation completion future. It resolves with results
520    /// of blocking function execution.
521    #[allow(clippy::type_complexity)]
522    pub struct JoinHandle<T> {
523        t: PhantomData<T>,
524    }
525
526    impl<T> JoinHandle<T> {
527        pub fn is_finished(&self) -> bool {
528            true
529        }
530    }
531
532    impl<T> Future for JoinHandle<T> {
533        type Output = Result<T, JoinError>;
534
535        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
536            todo!()
537        }
538    }
539
540    #[derive(Debug, Copy, Clone)]
541    pub struct JoinError;
542
543    impl fmt::Display for JoinError {
544        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545            write!(f, "JoinError")
546        }
547    }
548
549    impl std::error::Error for JoinError {}
550
551    #[derive(Clone, Debug)]
552    /// Handle to the runtime.
553    pub struct Handle;
554
555    impl Handle {
556        #[inline]
557        pub fn current() -> Self {
558            Self
559        }
560
561        #[inline]
562        /// Wake up runtime
563        pub fn notify(&self) {}
564
565        #[inline]
566        /// Spawns a new asynchronous task, returning a [`Task`] for it.
567        ///
568        /// Spawning a task enables the task to execute concurrently to other tasks.
569        /// There is no guarantee that a spawned task will execute to completion.
570        pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
571        where
572            F: Future + Send + 'static,
573            F::Output: Send + 'static,
574        {
575            panic!("async runtime is not configured");
576        }
577    }
578}
579
580#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
581pub use self::no_rt::*;