ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3use std::{cell::Cell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::{Id, PingRecord, System};
12
13thread_local! {
14    static CB: Cell<*const Callbacks> = const { Cell::new(ptr::null()) };
15}
16
17struct Callbacks {
18    before: Box<dyn Fn() -> Option<*const ()>>,
19    enter: Box<dyn Fn(*const ()) -> *const ()>,
20    exit: Box<dyn Fn(*const ())>,
21    after: Box<dyn Fn(*const ())>,
22}
23
24struct Data {
25    cb: &'static Callbacks,
26    ptr: *const (),
27}
28
29impl Data {
30    fn load() -> Option<Data> {
31        let cb = CB.with(|cb| cb.get());
32
33        if let Some(cb) = unsafe { cb.as_ref() } {
34            if let Some(ptr) = (*cb.before)() {
35                return Some(Data { cb, ptr });
36            }
37        }
38        None
39    }
40
41    fn run<F, R>(&mut self, f: F) -> R
42    where
43        F: FnOnce() -> R,
44    {
45        let ptr = (*self.cb.enter)(self.ptr);
46        let result = f();
47        (*self.cb.exit)(ptr);
48        result
49    }
50}
51
52impl Drop for Data {
53    fn drop(&mut self) {
54        (*self.cb.after)(self.ptr)
55    }
56}
57
58/// # Safety
59///
60/// The user must ensure that the pointer returned by `before` has a `'static` lifetime.
61/// This pointer will be owned by the spawned task for the duration of that task, and
62/// ownership will be returned to the user at the end of the task via `after`.
63/// The pointer remains opaque to the runtime.
64///
65/// # Panics
66///
67/// Panics if spawn callbacks have already been set.
68pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
69    before: FBefore,
70    enter: FEnter,
71    exit: FExit,
72    after: FAfter,
73) where
74    FBefore: Fn() -> Option<*const ()> + 'static,
75    FEnter: Fn(*const ()) -> *const () + 'static,
76    FExit: Fn(*const ()) + 'static,
77    FAfter: Fn(*const ()) + 'static,
78{
79    CB.with(|cb| {
80        if !cb.get().is_null() {
81            panic!("Spawn callbacks already set");
82        }
83
84        let new: *mut Callbacks = Box::leak(Box::new(Callbacks {
85            before: Box::new(before),
86            enter: Box::new(enter),
87            exit: Box::new(exit),
88            after: Box::new(after),
89        }));
90        cb.replace(new);
91    });
92}
93
94/// # Safety
95///
96/// The user must ensure that the pointer returned by `before` has a `'static` lifetime.
97/// This pointer will be owned by the spawned task for the duration of that task, and
98/// ownership will be returned to the user at the end of the task via `after`.
99/// The pointer remains opaque to the runtime.
100///
101/// Returns false if spawn callbacks have already been set.
102pub unsafe fn spawn_cbs_try<FBefore, FEnter, FExit, FAfter>(
103    before: FBefore,
104    enter: FEnter,
105    exit: FExit,
106    after: FAfter,
107) -> bool
108where
109    FBefore: Fn() -> Option<*const ()> + 'static,
110    FEnter: Fn(*const ()) -> *const () + 'static,
111    FExit: Fn(*const ()) + 'static,
112    FAfter: Fn(*const ()) + 'static,
113{
114    CB.with(|cb| {
115        if !cb.get().is_null() {
116            false
117        } else {
118            unsafe {
119                spawn_cbs(before, enter, exit, after);
120            }
121            true
122        }
123    })
124}
125
126#[allow(dead_code)]
127#[cfg(feature = "tokio")]
128mod tokio {
129    use std::future::{Future, poll_fn};
130    pub use tok_io::task::{JoinError, JoinHandle, spawn_blocking};
131
132    /// Runs the provided future, blocking the current thread until the future
133    /// completes.
134    pub fn block_on<F: Future<Output = ()>>(fut: F) {
135        if let Ok(hnd) = tok_io::runtime::Handle::try_current() {
136            log::debug!("Use existing tokio runtime and block on future");
137            hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
138        } else {
139            log::debug!("Create tokio runtime and block on future");
140
141            let rt = tok_io::runtime::Builder::new_current_thread()
142                .enable_all()
143                //.unhandled_panic(tok_io::runtime::UnhandledPanic::ShutdownRuntime)
144                .build()
145                .unwrap();
146            tok_io::task::LocalSet::new().block_on(&rt, fut);
147        }
148    }
149
150    /// Spawn a future on the current thread. This does not create a new Arbiter
151    /// or Arbiter address, it is simply a helper for spawning futures on the current
152    /// thread.
153    ///
154    /// # Panics
155    ///
156    /// This function panics if ntex system is not running.
157    #[inline]
158    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
159    where
160        F: Future + 'static,
161    {
162        if let Some(mut data) = crate::Data::load() {
163            tok_io::task::spawn_local(async move {
164                tok_io::pin!(f);
165                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
166            })
167        } else {
168            tok_io::task::spawn_local(f)
169        }
170    }
171
172    #[derive(Clone, Debug)]
173    /// Handle to the runtime.
174    pub struct Handle(tok_io::runtime::Handle);
175
176    impl Handle {
177        #[inline]
178        pub fn current() -> Self {
179            Self(tok_io::runtime::Handle::current())
180        }
181
182        #[inline]
183        /// Wake up runtime
184        pub fn notify(&self) {}
185
186        #[inline]
187        /// Spawns a new asynchronous task, returning a [`Task`] for it.
188        ///
189        /// Spawning a task enables the task to execute concurrently to other tasks.
190        /// There is no guarantee that a spawned task will execute to completion.
191        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
192        where
193            F: Future + Send + 'static,
194            F::Output: Send + 'static,
195        {
196            self.0.spawn(future)
197        }
198    }
199}
200
201#[allow(dead_code)]
202#[cfg(feature = "compio")]
203mod compio {
204    use std::task::{Context, Poll, ready};
205    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
206
207    use compio_driver::DriverType;
208    use compio_runtime::Runtime;
209
210    /// Runs the provided future, blocking the current thread until the future
211    /// completes.
212    pub fn block_on<F: Future<Output = ()>>(fut: F) {
213        log::info!(
214            "Starting compio runtime, driver {:?}",
215            compio_runtime::Runtime::try_with_current(|rt| rt.driver_type())
216                .unwrap_or(DriverType::Poll)
217        );
218        let rt = Runtime::new().unwrap();
219        rt.block_on(fut);
220    }
221
222    /// Spawns a blocking task.
223    ///
224    /// The task will be spawned onto a thread pool specifically dedicated
225    /// to blocking tasks. This is useful to prevent long-running synchronous
226    /// operations from blocking the main futures executor.
227    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
228    where
229        F: FnOnce() -> T + Send + Sync + 'static,
230        T: Send + 'static,
231    {
232        JoinHandle {
233            fut: Some(Either::Compio(compio_runtime::spawn_blocking(f))),
234        }
235    }
236
237    /// Spawn a future on the current thread. This does not create a new Arbiter
238    /// or Arbiter address, it is simply a helper for spawning futures on the current
239    /// thread.
240    ///
241    /// # Panics
242    ///
243    /// This function panics if ntex system is not running.
244    #[inline]
245    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
246    where
247        F: Future + 'static,
248    {
249        let fut = if let Some(mut data) = crate::Data::load() {
250            compio_runtime::spawn(async move {
251                let mut f = std::pin::pin!(f);
252                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
253            })
254        } else {
255            compio_runtime::spawn(f)
256        };
257
258        JoinHandle {
259            fut: Some(Either::Compio(fut)),
260        }
261    }
262
263    #[derive(Debug, Copy, Clone)]
264    pub struct JoinError;
265
266    impl fmt::Display for JoinError {
267        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268            write!(f, "JoinError")
269        }
270    }
271
272    impl std::error::Error for JoinError {}
273
274    enum Either<T> {
275        Compio(compio_runtime::JoinHandle<T>),
276        Spawn(oneshot::Receiver<T>),
277    }
278
279    pub struct JoinHandle<T> {
280        fut: Option<Either<T>>,
281    }
282
283    impl<T> JoinHandle<T> {
284        pub fn is_finished(&self) -> bool {
285            match &self.fut {
286                Some(Either::Compio(fut)) => fut.is_finished(),
287                Some(Either::Spawn(fut)) => fut.is_closed(),
288                None => true,
289            }
290        }
291    }
292
293    impl<T> Drop for JoinHandle<T> {
294        fn drop(&mut self) {
295            if let Some(Either::Compio(fut)) = self.fut.take() {
296                fut.detach();
297            }
298        }
299    }
300
301    impl<T> Future for JoinHandle<T> {
302        type Output = Result<T, JoinError>;
303
304        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
305            Poll::Ready(match self.fut.as_mut() {
306                Some(Either::Compio(fut)) => {
307                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
308                }
309                Some(Either::Spawn(fut)) => {
310                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
311                }
312                None => Err(JoinError),
313            })
314        }
315    }
316
317    #[derive(Clone, Debug)]
318    pub struct Handle(crate::Arbiter);
319
320    impl Handle {
321        pub fn current() -> Self {
322            Self(crate::Arbiter::current())
323        }
324
325        pub fn notify(&self) {}
326
327        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
328        where
329            F: Future + Send + 'static,
330            F::Output: Send + 'static,
331        {
332            let (tx, rx) = oneshot::channel();
333            self.0.spawn(async move {
334                let result = future.await;
335                let _ = tx.send(result);
336            });
337            JoinHandle {
338                fut: Some(Either::Spawn(rx)),
339            }
340        }
341    }
342}
343
344#[allow(dead_code)]
345#[cfg(feature = "neon")]
346mod neon {
347    use std::task::{Context, Poll, ready};
348    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
349
350    use ntex_neon::Runtime;
351
352    /// Runs the provided future, blocking the current thread until the future
353    /// completes.
354    pub fn block_on<F: Future<Output = ()>>(fut: F) {
355        let rt = Runtime::new().unwrap();
356        log::info!(
357            "Starting neon runtime, driver {:?}",
358            rt.driver_type().name()
359        );
360
361        rt.block_on(fut);
362    }
363
364    /// Spawn a future on the current thread. This does not create a new Arbiter
365    /// or Arbiter address, it is simply a helper for spawning futures on the current
366    /// thread.
367    ///
368    /// # Panics
369    ///
370    /// This function panics if ntex system is not running.
371    #[inline]
372    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
373    where
374        F: Future + 'static,
375    {
376        let task = if let Some(mut data) = crate::Data::load() {
377            ntex_neon::spawn(async move {
378                let mut f = std::pin::pin!(f);
379                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
380            })
381        } else {
382            ntex_neon::spawn(f)
383        };
384
385        JoinHandle {
386            task: Some(Either::Task(task)),
387        }
388    }
389
390    /// Spawns a blocking task.
391    ///
392    /// The task will be spawned onto a thread pool specifically dedicated
393    /// to blocking tasks. This is useful to prevent long-running synchronous
394    /// operations from blocking the main futures executor.
395    pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
396    where
397        F: FnOnce() -> T + Send + Sync + 'static,
398        T: Send + 'static,
399    {
400        JoinHandle {
401            task: Some(Either::Blocking(ntex_neon::spawn_blocking(f))),
402        }
403    }
404
405    #[derive(Clone, Debug)]
406    pub struct Handle(ntex_neon::Handle);
407
408    impl Handle {
409        pub fn current() -> Self {
410            Self(ntex_neon::Handle::current())
411        }
412
413        pub fn notify(&self) {
414            let _ = self.0.notify();
415        }
416
417        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
418        where
419            F: Future + Send + 'static,
420            F::Output: Send + 'static,
421        {
422            JoinHandle {
423                task: Some(Either::Task(self.0.spawn(future))),
424            }
425        }
426    }
427
428    enum Either<T> {
429        Task(ntex_neon::Task<T>),
430        Blocking(ntex_neon::JoinHandle<T>),
431    }
432
433    /// A spawned task.
434    pub struct JoinHandle<T> {
435        task: Option<Either<T>>,
436    }
437
438    impl<T> JoinHandle<T> {
439        pub fn is_finished(&self) -> bool {
440            match &self.task {
441                Some(Either::Task(fut)) => fut.is_finished(),
442                Some(Either::Blocking(fut)) => fut.is_closed(),
443                None => true,
444            }
445        }
446    }
447
448    impl<T> Drop for JoinHandle<T> {
449        fn drop(&mut self) {
450            if let Some(Either::Task(fut)) = self.task.take() {
451                fut.detach();
452            }
453        }
454    }
455
456    impl<T> Future for JoinHandle<T> {
457        type Output = Result<T, JoinError>;
458
459        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
460            Poll::Ready(match self.task.as_mut() {
461                Some(Either::Task(fut)) => Ok(ready!(Pin::new(fut).poll(cx))),
462                Some(Either::Blocking(fut)) => ready!(Pin::new(fut).poll(cx))
463                    .map_err(|_| JoinError)
464                    .and_then(|res| res.map_err(|_| JoinError)),
465                None => Err(JoinError),
466            })
467        }
468    }
469
470    #[derive(Debug, Copy, Clone)]
471    pub struct JoinError;
472
473    impl fmt::Display for JoinError {
474        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475            write!(f, "JoinError")
476        }
477    }
478
479    impl std::error::Error for JoinError {}
480}
481
482#[cfg(feature = "tokio")]
483pub use self::tokio::*;
484
485#[cfg(feature = "compio")]
486pub use self::compio::*;
487
488#[cfg(feature = "neon")]
489pub use self::neon::*;
490
491#[allow(dead_code)]
492#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
493mod no_rt {
494    use std::task::{Context, Poll};
495    use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
496
497    /// Runs the provided future, blocking the current thread until the future
498    /// completes.
499    pub fn block_on<F: Future<Output = ()>>(_: F) {
500        panic!("async runtime is not configured");
501    }
502
503    pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
504    where
505        F: Future + 'static,
506    {
507        unimplemented!()
508    }
509
510    pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
511    where
512        F: FnOnce() -> T + Send + Sync + 'static,
513        T: Send + 'static,
514    {
515        unimplemented!()
516    }
517
518    /// Blocking operation completion future. It resolves with results
519    /// of blocking function execution.
520    #[allow(clippy::type_complexity)]
521    pub struct JoinHandle<T> {
522        t: PhantomData<T>,
523    }
524
525    impl<T> JoinHandle<T> {
526        pub fn is_finished(&self) -> bool {
527            true
528        }
529    }
530
531    impl<T> Future for JoinHandle<T> {
532        type Output = Result<T, JoinError>;
533
534        fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
535            todo!()
536        }
537    }
538
539    #[derive(Debug, Copy, Clone)]
540    pub struct JoinError;
541
542    impl fmt::Display for JoinError {
543        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
544            write!(f, "JoinError")
545        }
546    }
547
548    impl std::error::Error for JoinError {}
549
550    #[derive(Clone, Debug)]
551    /// Handle to the runtime.
552    pub struct Handle;
553
554    impl Handle {
555        #[inline]
556        pub fn current() -> Self {
557            Self
558        }
559
560        #[inline]
561        /// Wake up runtime
562        pub fn notify(&self) {}
563
564        #[inline]
565        /// Spawns a new asynchronous task, returning a [`Task`] for it.
566        ///
567        /// Spawning a task enables the task to execute concurrently to other tasks.
568        /// There is no guarantee that a spawned task will execute to completion.
569        pub fn spawn<F>(&self, _: F) -> JoinHandle<F::Output>
570        where
571            F: Future + Send + 'static,
572            F::Output: Send + 'static,
573        {
574            panic!("async runtime is not configured");
575        }
576    }
577}
578
579#[cfg(all(not(feature = "tokio"), not(feature = "compio"), not(feature = "neon")))]
580pub use self::no_rt::*;