ntex_rt/
rt.rs

1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13/// The async runtime for ntex
14///
15/// It is a thread local runtime, and cannot be sent to other threads.
16pub struct Runtime {
17    stop: Cell<bool>,
18    queue: Arc<RunnableQueue>,
19}
20
21impl Runtime {
22    /// Create [`Runtime`] with default config.
23    pub fn new(handle: Box<dyn Notify>) -> Self {
24        Self::builder().build(handle)
25    }
26
27    /// Create a builder for [`Runtime`].
28    pub fn builder() -> RuntimeBuilder {
29        RuntimeBuilder::new()
30    }
31
32    #[allow(clippy::arc_with_non_send_sync)]
33    fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
34        Self {
35            stop: Cell::new(false),
36            queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
37        }
38    }
39
40    /// Perform a function on the current runtime.
41    ///
42    /// ## Panics
43    ///
44    /// This method will panic if there are no running [`Runtime`].
45    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
46        #[cold]
47        fn not_in_neon_runtime() -> ! {
48            panic!("not in a neon runtime")
49        }
50
51        if CURRENT_RUNTIME.is_set() {
52            CURRENT_RUNTIME.with(f)
53        } else {
54            not_in_neon_runtime()
55        }
56    }
57
58    #[inline]
59    /// Get handle for current runtime
60    pub fn handle(&self) -> Handle {
61        Handle {
62            queue: self.queue.clone(),
63        }
64    }
65
66    /// Spawns a new asynchronous task, returning a [`Task`] for it.
67    ///
68    /// Spawning a task enables the task to execute concurrently to other tasks.
69    /// There is no guarantee that a spawned task will execute to completion.
70    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
71        unsafe { self.spawn_unchecked(future) }
72    }
73
74    /// Spawns a new asynchronous task, returning a [`Task`] for it.
75    ///
76    /// # Safety
77    ///
78    /// The caller should ensure the captured lifetime is long enough.
79    pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
80        let queue = self.queue.clone();
81        let schedule = move |runnable| {
82            queue.schedule(runnable);
83        };
84        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
85        runnable.schedule();
86        JoinHandle::new(task)
87    }
88
89    /// Poll runtime and run active tasks
90    pub fn poll(&self) -> PollResult {
91        if self.stop.get() {
92            PollResult::Ready
93        } else if self.queue.run() {
94            PollResult::PollAgain
95        } else {
96            PollResult::Pending
97        }
98    }
99
100    /// Runs the provided future, blocking the current thread until the future
101    /// completes
102    pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
103        self.stop.set(false);
104
105        CURRENT_RUNTIME.set(self, || {
106            let mut result = None;
107            unsafe {
108                self.spawn_unchecked(async {
109                    result = Some(future.await);
110                    self.stop.set(true);
111                    let _ = self.queue.handle.notify();
112                });
113            }
114
115            driver.run(self).expect("Driver failed");
116            result.expect("Driver failed to poll")
117        })
118    }
119}
120
121impl Drop for Runtime {
122    fn drop(&mut self) {
123        CURRENT_RUNTIME.set(self, || {
124            self.queue.clear();
125        })
126    }
127}
128
129#[derive(Debug)]
130/// Handle for current runtime
131pub struct Handle {
132    queue: Arc<RunnableQueue>,
133}
134
135impl Handle {
136    /// Get handle for current runtime
137    ///
138    /// Panics if runtime is not set
139    pub fn current() -> Handle {
140        Runtime::with_current(|rt| rt.handle())
141    }
142
143    /// Wake up runtime
144    pub fn notify(&self) -> io::Result<()> {
145        self.queue.handle.notify()
146    }
147
148    /// Spawns a new asynchronous task, returning a [`Task`] for it.
149    ///
150    /// Spawning a task enables the task to execute concurrently to other tasks.
151    /// There is no guarantee that a spawned task will execute to completion.
152    pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
153        let queue = self.queue.clone();
154        let schedule = move |runnable| {
155            queue.schedule(runnable);
156        };
157        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
158        runnable.schedule();
159        JoinHandle::new(task)
160    }
161}
162
163impl Clone for Handle {
164    fn clone(&self) -> Self {
165        Self {
166            queue: self.queue.clone(),
167        }
168    }
169}
170
171#[derive(Debug)]
172struct RunnableQueue {
173    id: thread::ThreadId,
174    idle: Cell<bool>,
175    handle: Box<dyn Notify>,
176    event_interval: usize,
177    local_queue: UnsafeCell<VecDeque<Runnable>>,
178    sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
179    sync_queue: SegQueue<Runnable>,
180}
181
182unsafe impl Send for RunnableQueue {}
183unsafe impl Sync for RunnableQueue {}
184
185impl RunnableQueue {
186    fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
187        Self {
188            handle,
189            event_interval,
190            id: thread::current().id(),
191            idle: Cell::new(true),
192            local_queue: UnsafeCell::new(VecDeque::new()),
193            sync_fixed_queue: Queue::default(),
194            sync_queue: SegQueue::new(),
195        }
196    }
197
198    fn schedule(&self, runnable: Runnable) {
199        if self.id == thread::current().id() {
200            unsafe { (*self.local_queue.get()).push_back(runnable) };
201            if self.idle.get() {
202                self.idle.set(false);
203                self.handle.notify().ok();
204            }
205        } else {
206            let result = self.sync_fixed_queue.try_enqueue([runnable]);
207            if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
208                self.sync_queue.push(runnable);
209            }
210            self.handle.notify().ok();
211        }
212    }
213
214    fn run(&self) -> bool {
215        for _ in 0..self.event_interval {
216            let task = unsafe { (*self.local_queue.get()).pop_front() };
217            if let Some(task) = task {
218                task.run();
219            } else {
220                break;
221            }
222        }
223
224        if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
225            for task in buf {
226                task.run();
227            }
228        }
229
230        for _ in 0..self.event_interval {
231            if !self.sync_queue.is_empty() {
232                if let Some(task) = self.sync_queue.pop() {
233                    task.run();
234                    continue;
235                }
236            }
237            break;
238        }
239
240        let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
241            || !self.sync_fixed_queue.is_empty()
242            || !self.sync_queue.is_empty();
243
244        if !more_tasks {
245            self.idle.set(true);
246        }
247        more_tasks
248    }
249
250    fn clear(&self) {
251        while self.sync_queue.pop().is_some() {}
252        while self.sync_fixed_queue.try_dequeue().is_ok() {}
253        unsafe { (*self.local_queue.get()).clear() };
254    }
255}
256
257/// Builder for [`Runtime`].
258#[derive(Debug, Clone)]
259pub struct RuntimeBuilder {
260    event_interval: usize,
261}
262
263impl Default for RuntimeBuilder {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl RuntimeBuilder {
270    /// Create the builder with default config.
271    pub fn new() -> Self {
272        Self { event_interval: 61 }
273    }
274
275    /// Sets the number of scheduler ticks after which the scheduler will poll
276    /// for external events (timers, I/O, and so on).
277    ///
278    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
279    pub fn event_interval(&mut self, val: usize) -> &mut Self {
280        self.event_interval = val;
281        self
282    }
283
284    /// Build [`Runtime`].
285    pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
286        Runtime::with_builder(self, handle)
287    }
288}