Skip to main content

ntex_rt/
rt.rs

1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13#[derive(Debug)]
14/// The async runtime for ntex
15///
16/// It is a thread local runtime, and cannot be sent to other threads.
17pub struct Runtime {
18    stop: Cell<bool>,
19    queue: Arc<RunnableQueue>,
20}
21
22impl Runtime {
23    /// Create [`Runtime`] with default config.
24    pub fn new(handle: Box<dyn Notify>) -> Self {
25        Self::builder().build(handle)
26    }
27
28    /// Create a builder for [`Runtime`].
29    pub fn builder() -> RuntimeBuilder {
30        RuntimeBuilder::new()
31    }
32
33    #[allow(clippy::arc_with_non_send_sync)]
34    fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
35        Self {
36            stop: Cell::new(false),
37            queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
38        }
39    }
40
41    /// Perform a function on the current runtime.
42    ///
43    /// ## Panics
44    ///
45    /// This method will panic if there are no running [`Runtime`].
46    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
47        #[cold]
48        fn not_in_neon_runtime() -> ! {
49            panic!("not in a neon runtime")
50        }
51
52        if CURRENT_RUNTIME.is_set() {
53            CURRENT_RUNTIME.with(f)
54        } else {
55            not_in_neon_runtime()
56        }
57    }
58
59    #[inline]
60    /// Get handle for current runtime
61    pub fn handle(&self) -> Handle {
62        Handle {
63            queue: self.queue.clone(),
64        }
65    }
66
67    /// Spawns a new asynchronous task, returning a [`Task`] for it.
68    ///
69    /// Spawning a task enables the task to execute concurrently to other tasks.
70    /// There is no guarantee that a spawned task will execute to completion.
71    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
72        unsafe { self.spawn_unchecked(future) }
73    }
74
75    /// Spawns a new asynchronous task, returning a [`Task`] for it.
76    ///
77    /// # Safety
78    ///
79    /// The caller should ensure the captured lifetime is long enough.
80    pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
81        let queue = self.queue.clone();
82        let schedule = move |runnable| {
83            queue.schedule(runnable);
84        };
85        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
86        runnable.schedule();
87        JoinHandle::new(task)
88    }
89
90    /// Poll runtime and run active tasks
91    pub fn poll(&self) -> PollResult {
92        if self.stop.get() {
93            PollResult::Ready
94        } else if self.queue.run() {
95            PollResult::PollAgain
96        } else {
97            PollResult::Pending
98        }
99    }
100
101    /// Runs the provided future
102    ///
103    /// Blocking the current thread until the future completes
104    ///
105    /// # Panics
106    ///
107    /// Call may panic if driver fails to run provided future
108    pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
109        self.stop.set(false);
110
111        CURRENT_RUNTIME.set(self, || {
112            let mut result = None;
113            unsafe {
114                self.spawn_unchecked(async {
115                    result = Some(future.await);
116                    self.stop.set(true);
117                    let _ = self.queue.handle.notify();
118                });
119            }
120
121            ntex_error::set_backtrace_start_alt("src/raw.rs", 0);
122            driver.run(self).expect("Driver failed");
123            result.expect("Driver failed to poll")
124        })
125    }
126}
127
128impl Drop for Runtime {
129    fn drop(&mut self) {
130        CURRENT_RUNTIME.set(self, || {
131            self.queue.clear();
132        });
133    }
134}
135
136#[derive(Debug)]
137/// Handle for current runtime
138pub struct Handle {
139    queue: Arc<RunnableQueue>,
140}
141
142impl Handle {
143    /// Get handle for current runtime
144    ///
145    /// Panics if runtime is not set
146    pub fn current() -> Handle {
147        Runtime::with_current(Runtime::handle)
148    }
149
150    /// Wake up runtime
151    pub fn notify(&self) -> io::Result<()> {
152        self.queue.handle.notify()
153    }
154
155    /// Spawns a new asynchronous task, returning a [`Task`] for it.
156    ///
157    /// Spawning a task enables the task to execute concurrently to other tasks.
158    /// There is no guarantee that a spawned task will execute to completion.
159    pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
160        let queue = self.queue.clone();
161        let schedule = move |runnable| {
162            queue.schedule(runnable);
163        };
164        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
165        runnable.schedule();
166        JoinHandle::new(task)
167    }
168}
169
170impl Clone for Handle {
171    fn clone(&self) -> Self {
172        Self {
173            queue: self.queue.clone(),
174        }
175    }
176}
177
178#[derive(Debug)]
179struct RunnableQueue {
180    id: thread::ThreadId,
181    idle: Cell<bool>,
182    handle: Box<dyn Notify>,
183    event_interval: usize,
184    local_queue: UnsafeCell<VecDeque<Runnable>>,
185    sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
186    sync_queue: SegQueue<Runnable>,
187}
188
189unsafe impl Send for RunnableQueue {}
190unsafe impl Sync for RunnableQueue {}
191
192impl RunnableQueue {
193    fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
194        Self {
195            handle,
196            event_interval,
197            id: thread::current().id(),
198            idle: Cell::new(true),
199            local_queue: UnsafeCell::new(VecDeque::new()),
200            sync_fixed_queue: Queue::default(),
201            sync_queue: SegQueue::new(),
202        }
203    }
204
205    fn schedule(&self, runnable: Runnable) {
206        if self.id == thread::current().id() {
207            unsafe { (*self.local_queue.get()).push_back(runnable) };
208            if self.idle.get() {
209                self.idle.set(false);
210                self.handle.notify().ok();
211            }
212        } else {
213            let result = self.sync_fixed_queue.try_enqueue([runnable]);
214            if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
215                self.sync_queue.push(runnable);
216            }
217            self.handle.notify().ok();
218        }
219    }
220
221    fn run(&self) -> bool {
222        for _ in 0..self.event_interval {
223            let task = unsafe { (*self.local_queue.get()).pop_front() };
224            if let Some(task) = task {
225                task.run();
226            } else {
227                break;
228            }
229        }
230
231        if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
232            for task in buf {
233                task.run();
234            }
235        }
236
237        for _ in 0..self.event_interval {
238            if !self.sync_queue.is_empty()
239                && let Some(task) = self.sync_queue.pop()
240            {
241                task.run();
242                continue;
243            }
244            break;
245        }
246
247        let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
248            || !self.sync_fixed_queue.is_empty()
249            || !self.sync_queue.is_empty();
250
251        if !more_tasks {
252            self.idle.set(true);
253        }
254        more_tasks
255    }
256
257    fn clear(&self) {
258        while self.sync_queue.pop().is_some() {}
259        while self.sync_fixed_queue.try_dequeue().is_ok() {}
260        unsafe { (*self.local_queue.get()).clear() };
261    }
262}
263
264/// Builder for [`Runtime`].
265#[derive(Debug, Clone)]
266pub struct RuntimeBuilder {
267    event_interval: usize,
268}
269
270impl Default for RuntimeBuilder {
271    fn default() -> Self {
272        Self::new()
273    }
274}
275
276impl RuntimeBuilder {
277    /// Create the builder with default config.
278    pub fn new() -> Self {
279        Self { event_interval: 61 }
280    }
281
282    /// Sets the number of scheduler ticks after which the scheduler will poll
283    /// for external events (timers, I/O, and so on).
284    ///
285    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
286    pub fn event_interval(&mut self, val: usize) -> &mut Self {
287        self.event_interval = val;
288        self
289    }
290
291    /// Build [`Runtime`].
292    pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
293        Runtime::with_builder(self, handle)
294    }
295}