ntex_rt/
rt.rs

1use std::cell::{Cell, UnsafeCell};
2use std::collections::VecDeque;
3use std::{future::Future, io, sync::Arc, thread};
4
5use async_task::Runnable;
6use crossbeam_queue::SegQueue;
7use swap_buffer_queue::{Queue, buffer::ArrayBuffer, error::TryEnqueueError};
8
9use crate::{driver::Driver, driver::Notify, driver::PollResult, handle::JoinHandle};
10
11scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
12
13/// The async runtime for ntex
14///
15/// It is a thread local runtime, and cannot be sent to other threads.
16pub struct Runtime {
17    stop: Cell<bool>,
18    queue: Arc<RunnableQueue>,
19}
20
21impl Runtime {
22    /// Create [`Runtime`] with default config.
23    pub fn new(handle: Box<dyn Notify>) -> Self {
24        Self::builder().build(handle)
25    }
26
27    /// Create a builder for [`Runtime`].
28    pub fn builder() -> RuntimeBuilder {
29        RuntimeBuilder::new()
30    }
31
32    #[allow(clippy::arc_with_non_send_sync)]
33    fn with_builder(builder: &RuntimeBuilder, handle: Box<dyn Notify>) -> Self {
34        Self {
35            stop: Cell::new(false),
36            queue: Arc::new(RunnableQueue::new(builder.event_interval, handle)),
37        }
38    }
39
40    /// Perform a function on the current runtime.
41    ///
42    /// ## Panics
43    ///
44    /// This method will panic if there are no running [`Runtime`].
45    pub fn with_current<T, F: FnOnce(&Self) -> T>(f: F) -> T {
46        #[cold]
47        fn not_in_neon_runtime() -> ! {
48            panic!("not in a neon runtime")
49        }
50
51        if CURRENT_RUNTIME.is_set() {
52            CURRENT_RUNTIME.with(f)
53        } else {
54            not_in_neon_runtime()
55        }
56    }
57
58    #[inline]
59    /// Get handle for current runtime
60    pub fn handle(&self) -> Handle {
61        Handle {
62            queue: self.queue.clone(),
63        }
64    }
65
66    /// Spawns a new asynchronous task, returning a [`Task`] for it.
67    ///
68    /// Spawning a task enables the task to execute concurrently to other tasks.
69    /// There is no guarantee that a spawned task will execute to completion.
70    pub fn spawn<F: Future + 'static>(&self, future: F) -> JoinHandle<F::Output> {
71        unsafe { self.spawn_unchecked(future) }
72    }
73
74    /// Spawns a new asynchronous task, returning a [`Task`] for it.
75    ///
76    /// # Safety
77    ///
78    /// The caller should ensure the captured lifetime is long enough.
79    pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> JoinHandle<F::Output> {
80        let queue = self.queue.clone();
81        let schedule = move |runnable| {
82            queue.schedule(runnable);
83        };
84        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
85        runnable.schedule();
86        JoinHandle::new(task)
87    }
88
89    /// Poll runtime and run active tasks
90    pub fn poll(&self) -> PollResult {
91        if self.stop.get() {
92            PollResult::Ready
93        } else if self.queue.run() {
94            PollResult::PollAgain
95        } else {
96            PollResult::Pending
97        }
98    }
99
100    /// Runs the provided future, blocking the current thread until the future
101    /// completes
102    pub fn block_on<F: Future>(&self, future: F, driver: &dyn Driver) -> F::Output {
103        self.stop.set(false);
104
105        CURRENT_RUNTIME.set(self, || {
106            let mut result = None;
107            unsafe {
108                self.spawn_unchecked(async {
109                    result = Some(future.await);
110                    self.stop.set(true);
111                    let _ = self.queue.handle.notify();
112                });
113            }
114
115            driver.run(self).expect("Driver failed");
116            result.expect("Driver failed to poll")
117        })
118    }
119}
120
121impl Drop for Runtime {
122    fn drop(&mut self) {
123        CURRENT_RUNTIME.set(self, || {
124            self.queue.clear();
125        })
126    }
127}
128
129#[derive(Debug)]
130/// Handle for current runtime
131pub struct Handle {
132    queue: Arc<RunnableQueue>,
133}
134
135impl Handle {
136    /// Get handle for current runtime
137    ///
138    /// Panics if runtime is not set
139    pub fn current() -> Handle {
140        Runtime::with_current(|rt| rt.handle())
141    }
142
143    /// Wake up runtime
144    pub fn notify(&self) -> io::Result<()> {
145        self.queue.handle.notify()
146    }
147
148    /// Spawns a new asynchronous task, returning a [`Task`] for it.
149    ///
150    /// Spawning a task enables the task to execute concurrently to other tasks.
151    /// There is no guarantee that a spawned task will execute to completion.
152    pub fn spawn<F: Future + Send + 'static>(&self, future: F) -> JoinHandle<F::Output> {
153        let queue = self.queue.clone();
154        let schedule = move |runnable| {
155            queue.schedule(runnable);
156        };
157        let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
158        runnable.schedule();
159        JoinHandle::new(task)
160    }
161}
162
163impl Clone for Handle {
164    fn clone(&self) -> Self {
165        Self {
166            queue: self.queue.clone(),
167        }
168    }
169}
170
171#[derive(Debug)]
172struct RunnableQueue {
173    id: thread::ThreadId,
174    idle: Cell<bool>,
175    handle: Box<dyn Notify>,
176    event_interval: usize,
177    local_queue: UnsafeCell<VecDeque<Runnable>>,
178    sync_fixed_queue: Queue<ArrayBuffer<Runnable, 128>>,
179    sync_queue: SegQueue<Runnable>,
180}
181
182unsafe impl Send for RunnableQueue {}
183unsafe impl Sync for RunnableQueue {}
184
185impl RunnableQueue {
186    fn new(event_interval: usize, handle: Box<dyn Notify>) -> Self {
187        Self {
188            handle,
189            event_interval,
190            id: thread::current().id(),
191            idle: Cell::new(true),
192            local_queue: UnsafeCell::new(VecDeque::new()),
193            sync_fixed_queue: Queue::default(),
194            sync_queue: SegQueue::new(),
195        }
196    }
197
198    fn schedule(&self, runnable: Runnable) {
199        if self.id == thread::current().id() {
200            unsafe { (*self.local_queue.get()).push_back(runnable) };
201            if self.idle.get() {
202                self.idle.set(false);
203                self.handle.notify().ok();
204            }
205        } else {
206            let result = self.sync_fixed_queue.try_enqueue([runnable]);
207            if let Err(TryEnqueueError::InsufficientCapacity([runnable])) = result {
208                self.sync_queue.push(runnable);
209            }
210            self.handle.notify().ok();
211        }
212    }
213
214    fn run(&self) -> bool {
215        self.idle.set(false);
216
217        for _ in 0..self.event_interval {
218            let task = unsafe { (*self.local_queue.get()).pop_front() };
219            if let Some(task) = task {
220                task.run();
221            } else {
222                break;
223            }
224        }
225
226        if let Ok(buf) = self.sync_fixed_queue.try_dequeue() {
227            for task in buf {
228                task.run();
229            }
230        }
231
232        for _ in 0..self.event_interval {
233            if !self.sync_queue.is_empty() {
234                if let Some(task) = self.sync_queue.pop() {
235                    task.run();
236                    continue;
237                }
238            }
239            break;
240        }
241        self.idle.set(true);
242
243        !unsafe { (*self.local_queue.get()).is_empty() }
244            || !self.sync_fixed_queue.is_empty()
245            || !self.sync_queue.is_empty()
246    }
247
248    fn clear(&self) {
249        while self.sync_queue.pop().is_some() {}
250        while self.sync_fixed_queue.try_dequeue().is_ok() {}
251        unsafe { (*self.local_queue.get()).clear() };
252    }
253}
254
255/// Builder for [`Runtime`].
256#[derive(Debug, Clone)]
257pub struct RuntimeBuilder {
258    event_interval: usize,
259}
260
261impl Default for RuntimeBuilder {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267impl RuntimeBuilder {
268    /// Create the builder with default config.
269    pub fn new() -> Self {
270        Self { event_interval: 61 }
271    }
272
273    /// Sets the number of scheduler ticks after which the scheduler will poll
274    /// for external events (timers, I/O, and so on).
275    ///
276    /// A scheduler “tick” roughly corresponds to one poll invocation on a task.
277    pub fn event_interval(&mut self, val: usize) -> &mut Self {
278        self.event_interval = val;
279        self
280    }
281
282    /// Build [`Runtime`].
283    pub fn build(&self, handle: Box<dyn Notify>) -> Runtime {
284        Runtime::with_builder(self, handle)
285    }
286}