ntex_rt/
lib.rs

1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2//! A runtime implementation that runs everything on the current thread.
3mod arbiter;
4mod builder;
5mod driver;
6mod handle;
7mod pool;
8mod rt;
9mod system;
10mod task;
11
12pub use self::arbiter::Arbiter;
13pub use self::builder::{Builder, SystemRunner};
14pub use self::driver::{BlockFuture, Driver, DriverType, Notify, PollResult, Runner};
15pub use self::pool::{BlockingError, BlockingResult};
16pub use self::rt::{Runtime, RuntimeBuilder};
17pub use self::system::{Id, PingRecord, System};
18pub use self::task::{task_callbacks, task_opt_callbacks};
19
20/// Spawns a blocking task in a new thread, and wait for it.
21///
22/// The task will not be cancelled even if the future is dropped.
23pub fn spawn_blocking<F, R>(f: F) -> BlockingResult<R>
24where
25    F: FnOnce() -> R + Send + 'static,
26    R: Send + 'static,
27{
28    System::current().spawn_blocking(f)
29}
30
31#[cfg(feature = "tokio")]
32mod tokio {
33    use std::future::{Future, poll_fn};
34    pub use tok_io::task::{JoinError, JoinHandle};
35
36    /// Spawn a future on the current thread. This does not create a new Arbiter
37    /// or Arbiter address, it is simply a helper for spawning futures on the current
38    /// thread.
39    ///
40    /// # Panics
41    ///
42    /// This function panics if ntex system is not running.
43    #[inline]
44    pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
45    where
46        F: Future + 'static,
47    {
48        if let Some(mut data) = crate::task::Data::load() {
49            tok_io::task::spawn_local(async move {
50                tok_io::pin!(f);
51                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
52            })
53        } else {
54            tok_io::task::spawn_local(f)
55        }
56    }
57
58    #[derive(Clone, Debug)]
59    /// Handle to the runtime.
60    pub struct Handle(tok_io::runtime::Handle);
61
62    impl Handle {
63        #[inline]
64        pub fn current() -> Self {
65            Self(tok_io::runtime::Handle::current())
66        }
67
68        #[inline]
69        /// Wake up runtime
70        pub fn notify(&self) {}
71
72        #[inline]
73        /// Spawns a new asynchronous task, returning a [`Task`] for it.
74        ///
75        /// Spawning a task enables the task to execute concurrently to other tasks.
76        /// There is no guarantee that a spawned task will execute to completion.
77        pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
78        where
79            F: Future + Send + 'static,
80            F::Output: Send + 'static,
81        {
82            self.0.spawn(future)
83        }
84    }
85}
86
87#[cfg(feature = "compio")]
88mod compio {
89    use std::task::{Context, Poll, ready};
90    use std::{fmt, future::Future, future::poll_fn, pin::Pin};
91
92    /// Spawn a future on the current thread. This does not create a new Arbiter
93    /// or Arbiter address, it is simply a helper for spawning futures on the current
94    /// thread.
95    ///
96    /// # Panics
97    ///
98    /// This function panics if ntex system is not running.
99    #[inline]
100    pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
101    where
102        F: Future + 'static,
103    {
104        let fut = if let Some(mut data) = crate::task::Data::load() {
105            compio_runtime::spawn(async move {
106                let mut f = std::pin::pin!(f);
107                poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
108            })
109        } else {
110            compio_runtime::spawn(f)
111        };
112
113        JoinHandle {
114            fut: Some(Either::Compio(fut)),
115        }
116    }
117
118    #[derive(Debug, Copy, Clone)]
119    pub struct JoinError;
120
121    impl fmt::Display for JoinError {
122        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123            write!(f, "JoinError")
124        }
125    }
126
127    impl std::error::Error for JoinError {}
128
129    enum Either<T> {
130        Compio(compio_runtime::JoinHandle<T>),
131        Spawn(oneshot::Receiver<T>),
132    }
133
134    pub struct JoinHandle<T> {
135        fut: Option<Either<T>>,
136    }
137
138    impl<T> JoinHandle<T> {
139        pub fn is_finished(&self) -> bool {
140            match &self.fut {
141                Some(Either::Compio(fut)) => fut.is_finished(),
142                Some(Either::Spawn(fut)) => fut.is_closed(),
143                None => true,
144            }
145        }
146    }
147
148    impl<T> Drop for JoinHandle<T> {
149        fn drop(&mut self) {
150            if let Some(Either::Compio(fut)) = self.fut.take() {
151                fut.detach();
152            }
153        }
154    }
155
156    impl<T> Future for JoinHandle<T> {
157        type Output = Result<T, JoinError>;
158
159        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160            Poll::Ready(match self.fut.as_mut() {
161                Some(Either::Compio(fut)) => {
162                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
163                }
164                Some(Either::Spawn(fut)) => {
165                    ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
166                }
167                None => Err(JoinError),
168            })
169        }
170    }
171
172    #[derive(Clone, Debug)]
173    pub struct Handle(crate::Arbiter);
174
175    impl Handle {
176        pub fn current() -> Self {
177            Self(crate::Arbiter::current())
178        }
179
180        pub fn notify(&self) {}
181
182        pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
183        where
184            F: Future + Send + 'static,
185            F::Output: Send + 'static,
186        {
187            let (tx, rx) = oneshot::channel();
188            self.0.spawn(async move {
189                let result = future.await;
190                let _ = tx.send(result);
191            });
192            JoinHandle {
193                fut: Some(Either::Spawn(rx)),
194            }
195        }
196    }
197}
198
199#[cfg(feature = "tokio")]
200pub use self::tokio::*;
201
202#[cfg(all(feature = "compio", not(feature = "tokio")))]
203pub use self::compio::*;
204
205pub mod default_runtime {
206    use std::fmt;
207    use std::future::{Future, poll_fn};
208
209    pub use crate::rt::{Handle, Runtime};
210
211    #[derive(Debug, Copy, Clone)]
212    pub struct JoinError;
213
214    impl fmt::Display for JoinError {
215        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216            write!(f, "JoinError")
217        }
218    }
219
220    impl std::error::Error for JoinError {}
221
222    pub fn spawn<F>(fut: F) -> crate::handle::JoinHandle<F::Output>
223    where
224        F: Future + 'static,
225    {
226        if let Some(mut data) = crate::task::Data::load() {
227            Runtime::with_current(|rt| {
228                rt.spawn(async move {
229                    let mut f = std::pin::pin!(fut);
230                    poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
231                })
232            })
233        } else {
234            Runtime::with_current(|rt| rt.spawn(fut))
235        }
236    }
237}
238
239#[cfg(all(not(feature = "tokio"), not(feature = "compio")))]
240pub use self::default_runtime::*;