Skip to main content

ntex_rt/
rt.rs

1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13#[derive(Debug)]
14/// The async runtime for ntex
15///
16/// It is a thread local runtime, and cannot be sent to other threads.
17pub struct Runtime {
18    stop: Cell<bool>,
19    queue: Arc<RunnableQueue>,
20}
21
22impl Runtime {
23    /// Create [`Runtime`] with default config.
24    pub fn new(handle: Box<dyn Notify>) -> Self {
25        Self::builder().build(handle)
26    }
27
28    /// Create a builder for [`Runtime`].
29    pub fn builder() -> RuntimeBuilder {
30        RuntimeBuilder::new()
31    }
32
33    #[allow(clippy::arc_with_non_send_sync)]
34    fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
35        Self {
36            stop: Cell::new(false),
37            queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
38        }
39    }
40
41    /// Perform a function on the current runtime.
42    ///
43    /// ## Panics
44    ///
45    /// This method will panic if there are no running [`Runtime`].
46    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
47        #[cold]
48        fn not_in_neon_runtime() -> ! {
49            panic!("not in a neon runtime")
50        }
51
52        if CURRENT_RUNTIME.is_set() {
53            CURRENT_RUNTIME.with(f)
54        } else {
55            not_in_neon_runtime()
56        }
57    }
58
59    #[inline]
60    /// Get handle for current runtime
61    pub fn handle(&self) -> Handle {
62        Handle {
63            queue: self.queue.clone(),
64        }
65    }
66
67    /// Spawns a new asynchronous task, returning a [`Task`] for it.
68    ///
69    /// Spawning a task enables the task to execute concurrently to other tasks.
70    /// There is no guarantee that a spawned task will execute to completion.
71    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
72        unsafe { self.spawn_unchecked(future) }
73    }
74
75    /// Spawns a new asynchronous task, returning a [`Task`] for it.
76    ///
77    /// # Safety
78    ///
79    /// The caller should ensure the captured lifetime is long enough.
80    pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
81        let queue = self.queue.clone();
82        let schedule = move |runnable| {
83            queue.schedule(runnable);
84        };
85        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
86        runnable.schedule();
87        JoinHandle::new(task)
88    }
89
90    /// Poll runtime and run active tasks
91    pub fn poll(&self) -> PollResult {
92        if self.stop.get() {
93            PollResult::Ready
94        } else if self.queue.run() {
95            PollResult::PollAgain
96        } else {
97            PollResult::Pending
98        }
99    }
100
101    /// Runs the provided future
102    ///
103    /// Blocking the current thread until the future completes
104    ///
105    /// # Panics
106    ///
107    /// Call may panic if driver fails to run provided future
108    pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
109        self.stop.set(false);
110
111        CURRENT_RUNTIME.set(self, || {
112            let mut result = None;
113            unsafe {
114                self.spawn_unchecked(async {
115                    result = Some(future.await);
116                    self.stop.set(true);
117                    let _ = self.queue.handle.notify();
118                });
119            }
120
121            driver.run(self).expect("Driver failed");
122            result.expect("Driver failed to poll")
123        })
124    }
125}
126
127impl Drop for Runtime {
128    fn drop(&mut self) {
129        CURRENT_RUNTIME.set(self, || {
130            self.queue.clear();
131        });
132    }
133}
134
135#[derive(Debug)]
136/// Handle for current runtime
137pub struct Handle {
138    queue: Arc<RunnableQueue>,
139}
140
141impl Handle {
142    /// Get handle for current runtime
143    ///
144    /// Panics if runtime is not set
145    pub fn current() -> Handle {
146        Runtime::with_current(Runtime::handle)
147    }
148
149    /// Wake up runtime
150    pub fn notify(&self) -> io::Result<()> {
151        self.queue.handle.notify()
152    }
153
154    /// Spawns a new asynchronous task, returning a [`Task`] for it.
155    ///
156    /// Spawning a task enables the task to execute concurrently to other tasks.
157    /// There is no guarantee that a spawned task will execute to completion.
158    pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
159        let queue = self.queue.clone();
160        let schedule = move |runnable| {
161            queue.schedule(runnable);
162        };
163        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
164        runnable.schedule();
165        JoinHandle::new(task)
166    }
167}
168
169impl Clone for Handle {
170    fn clone(&self) -> Self {
171        Self {
172            queue: self.queue.clone(),
173        }
174    }
175}
176
177#[derive(Debug)]
178struct RunnableQueue {
179    id: thread::ThreadId,
180    idle: Cell<bool>,
181    handle: Box<dyn Notify>,
182    event_interval: usize,
183    local_queue: UnsafeCell<VecDeque<Runnable>>,
184    sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
185    sync_queue: SegQueue<Runnable>,
186}
187
188unsafe impl Send for RunnableQueue {}
189unsafe impl Sync for RunnableQueue {}
190
191impl RunnableQueue {
192    fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
193        Self {
194            handle,
195            event_interval,
196            id: thread::current().id(),
197            idle: Cell::new(true),
198            local_queue: UnsafeCell::new(VecDeque::new()),
199            sync_fixed_queue: Queue::default(),
200            sync_queue: SegQueue::new(),
201        }
202    }
203
204    fn schedule(&self, runnable: Runnable) {
205        if self.id == thread::current().id() {
206            unsafe { (*self.local_queue.get()).push_back(runnable) };
207            if self.idle.get() {
208                self.idle.set(false);
209                self.handle.notify().ok();
210            }
211        } else {
212            let result = self.sync_fixed_queue.try_enqueue([runnable]);
213            if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
214                self.sync_queue.push(runnable);
215            }
216            self.handle.notify().ok();
217        }
218    }
219
220    fn run(&self) -> bool {
221        for _ in 0..self.event_interval {
222            let task = unsafe { (*self.local_queue.get()).pop_front() };
223            if let Some(task) = task {
224                task.run();
225            } else {
226                break;
227            }
228        }
229
230        if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
231            for task in buf {
232                task.run();
233            }
234        }
235
236        for _ in 0..self.event_interval {
237            if !self.sync_queue.is_empty()
238                && let Some(task) = self.sync_queue.pop()
239            {
240                task.run();
241                continue;
242            }
243            break;
244        }
245
246        let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
247            || !self.sync_fixed_queue.is_empty()
248            || !self.sync_queue.is_empty();
249
250        if !more_tasks {
251            self.idle.set(true);
252        }
253        more_tasks
254    }
255
256    fn clear(&self) {
257        while self.sync_queue.pop().is_some() {}
258        while self.sync_fixed_queue.try_dequeue().is_ok() {}
259        unsafe { (*self.local_queue.get()).clear() };
260    }
261}
262
263/// Builder for [`Runtime`].
264#[derive(Debug, Clone)]
265pub struct RuntimeBuilder {
266    event_interval: usize,
267}
268
269impl Default for RuntimeBuilder {
270    fn default() -> Self {
271        Self::new()
272    }
273}
274
275impl RuntimeBuilder {
276    /// Create the builder with default config.
277    pub fn new() -> Self {
278        Self { event_interval: 61 }
279    }
280
281    /// Sets the number of scheduler ticks after which the scheduler will poll
282    /// for external events (timers, I/O, and so on).
283    ///
284    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
285    pub fn event_interval(&mut self, val: usize) -> &mut Self {
286        self.event_interval = val;
287        self
288    }
289
290    /// Build [`Runtime`].
291    pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
292        Runtime::with_builder(self, handle)
293    }
294}