Skip to main content

compio_executor/
lib.rs

1//! Executor for compio runtime.
2
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_if::cfg_if! {
31    if #[cfg(loom)] {
32        use loom::cell::UnsafeCell;
33        use loom::hint;
34        use loom::thread::yield_now;
35        use loom::sync::atomic::*;
36    } else {
37        use std::hint;
38        use std::thread::yield_now;
39        use std::sync::atomic::*;
40
41        #[repr(transparent)]
42        struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
43
44        impl<T> UnsafeCell<T> {
45            pub fn new(value: T) -> Self {
46                Self(std::cell::UnsafeCell::new(value))
47            }
48
49            #[inline(always)]
50            pub fn with_mut<F, R>(&self, f: F) -> R
51            where
52                F: FnOnce(*mut T) -> R,
53            {
54                f(self.0.get())
55            }
56
57            #[inline(always)]
58            pub fn with<F, R>(&self, f: F) -> R
59            where
60                F: FnOnce(*const T) -> R,
61            {
62                f(self.0.get())
63            }
64        }
65    }
66}
67
68pub(crate) type PanicResult<T> = Result<T, Panic>;
69pub(crate) type Panic = Box<dyn Any + Send + 'static>;
70
71/// A dual-queue executor optimized for singlethreaded usecase, with support for
72/// multithreaded wakes.
73///
74/// Same-thread wakes ([`Waker::wake`]) will schedule tasks within the queue
75/// directly; cross-thread wakes will send task id's to a channel, and
76/// piggybacked to singlethreaded wakes or ticks. This ensures maximum
77/// performance for singlethreaded scenario at the trade-off of worse tail
78/// latency for multithreaded wake-ups.
79///
80/// Optionally, all [`Waker`]s generated from this executor can contain an extra
81/// data, parameterized as `E`.
82///
83/// [`Waker`]: std::task::Waker
84/// [`Waker::wake`]: std::task::Waker::wake
85#[derive(Debug)]
86pub struct Executor {
87    ptr: NonNull<Shared>,
88    config: ExecutorConfig,
89}
90
91/// Configuration for [`Executor`].
92#[derive(Debug, Clone)]
93pub struct ExecutorConfig {
94    /// The size of the sync queue, which holds task id's for cross-thread
95    /// wakes.
96    ///
97    /// This is fixed and will create backpressure when full.
98    pub sync_queue_size: usize,
99
100    /// The size of the local queues, which hold tasks for same-thread
101    /// execution.
102    ///
103    /// This is dynamically resized to avoid blocking.
104    pub local_queue_size: usize,
105
106    /// The maximum number of hot tasks to run in each tick.
107    pub max_interval: u32,
108
109    /// A waker to be woken when a task is scheduled.
110    ///
111    /// This is useful for waking up drivers that switch to kernel state when
112    /// idle.
113    pub waker: Option<Waker>,
114}
115
116impl Default for ExecutorConfig {
117    fn default() -> Self {
118        Self {
119            sync_queue_size: 64,
120            local_queue_size: 64,
121            max_interval: 61,
122            waker: None,
123        }
124    }
125}
126
127pub(crate) struct Shared {
128    waker: Option<Waker>,
129    sync: ArrayQueue<TaskId>,
130    queue: SendWrapper<TaskQueue>,
131}
132
133impl Executor {
134    /// Create a new executor.
135    pub fn new() -> Self {
136        Self::with_config(ExecutorConfig::default())
137    }
138
139    /// Create a new executor with config.
140    pub fn with_config(mut config: ExecutorConfig) -> Self {
141        let ptr = Box::into_raw(Box::new(Shared {
142            waker: config.waker.take(),
143            sync: ArrayQueue::new(config.sync_queue_size),
144            queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
145        }));
146
147        Self {
148            config,
149            ptr: unsafe { NonNull::new_unchecked(ptr) },
150        }
151    }
152
153    /// Spawn a future onto the executor.
154    pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
155        let shared = self.shared();
156        let tracker = shared.queue.tracker();
157        // SAFETY: Executor cannot be sent to ther thread
158        let queue = unsafe { shared.queue.get_unchecked() };
159        let task = queue.insert(self.ptr, tracker, fut);
160
161        JoinHandle::new(task)
162    }
163
164    /// Retrieve all sync tasks, schedule those to the tail of `hot` queue
165    /// and run at most [`max_interval`] tasks.
166    ///
167    /// Running start with `hot` tasks, then `cold` ones. Finished tasks will
168    /// be pushed back to tail of `cold` queue.
169    ///
170    /// Return whether there are still hot tasks after the tick.
171    ///
172    /// [`max_interval`]: ExecutorConfig::max_interval
173    pub fn tick(&self) -> bool {
174        let queue = self.queue();
175
176        while let Some(id) = self.shared().sync.pop() {
177            queue.make_hot(id);
178        }
179
180        for id in queue.iter_hot().take(self.config.max_interval as _) {
181            queue.make_cold(id);
182            let task = queue.take(id).expect("Task was not reset back");
183            let res = unsafe { task.run() };
184            if res.is_ready() {
185                // SAFETY: We're removing it soon, so drop will only be called once.
186                // The shared pointer is kept valid until the Executor is dropped,
187                // to avoid use-after-free issues with concurrent wakers.
188                unsafe { task.drop() };
189                queue.remove(id);
190            } else {
191                queue.reset(id, task);
192            }
193        }
194
195        queue.has_hot()
196    }
197
198    /// Check if there's still scheduled task that needs to be ran.
199    #[doc(hidden)]
200    pub fn has_task(&self) -> bool {
201        self.queue().hot_head().is_some()
202    }
203
204    /// Clear the executor, drop all tasks.
205    ///
206    /// This should be called only in context of the runtime, if any future may
207    /// use it. Any panic happened during dropping the future will cause the
208    /// process to abort. If this was not called before dropping, all tasks will
209    /// be leakded.
210    pub fn clear(&self) {
211        instrument!(compio_log::Level::TRACE, "Executor::drop");
212        trace!("Dropping Executor");
213
214        while self.shared().sync.pop().is_some() {}
215        unsafe { self.queue().clear() };
216    }
217
218    #[inline(always)]
219    fn shared(&self) -> &Shared {
220        unsafe { self.ptr.as_ref() }
221    }
222
223    #[inline(always)]
224    fn queue(&self) -> &TaskQueue {
225        // SAFETY: Executor is single threaded
226        unsafe { self.shared().queue.get_unchecked() }
227    }
228}
229
230impl Drop for Executor {
231    fn drop(&mut self) {
232        self.clear();
233        unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
234    }
235}
236
237impl Default for Executor {
238    fn default() -> Self {
239        Self::new()
240    }
241}