Skip to main content

compio_executor/
lib.rs

1//! Executor for compio runtime.
2
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8    html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11    html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_if::cfg_if! {
31    if #[cfg(loom)] {
32        use loom::cell::UnsafeCell;
33        use loom::hint;
34        use loom::thread::yield_now;
35        use loom::sync::atomic::*;
36    } else {
37        use std::hint;
38        use std::thread::yield_now;
39        use std::sync::atomic::*;
40
41        #[repr(transparent)]
42        struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
43
44        impl<T> UnsafeCell<T> {
45            pub fn new(value: T) -> Self {
46                Self(std::cell::UnsafeCell::new(value))
47            }
48
49            #[inline(always)]
50            pub fn with_mut<F, R>(&self, f: F) -> R
51            where
52                F: FnOnce(*mut T) -> R,
53            {
54                f(self.0.get())
55            }
56
57            #[inline(always)]
58            pub fn with<F, R>(&self, f: F) -> R
59            where
60                F: FnOnce(*const T) -> R,
61            {
62                f(self.0.get())
63            }
64        }
65    }
66}
67
68pub(crate) type PanicResult<T> = Result<T, Panic>;
69pub(crate) type Panic = Box<dyn Any + Send + 'static>;
70
71/// A dual-queue executor optimized for singlethreaded usecase, with support for
72/// multithreaded wakes.
73///
74/// Same-thread wakes ([`Waker::wake`]) will schedule tasks within the queue
75/// directly; cross-thread wakes will send task id's to a channel, and
76/// piggybacked to singlethreaded wakes or ticks. This ensures maximum
77/// performance for singlethreaded scenario at the trade-off of worse tail
78/// latency for multithreaded wake-ups.
79///
80/// Optionally, all [`Waker`]s generated from this executor can contain an extra
81/// data, parameterized as `E`.
82///
83/// [`Waker`]: std::task::Waker
84/// [`Waker::wake`]: std::task::Waker::wake
85#[derive(Debug)]
86pub struct Executor {
87    ptr: NonNull<Shared>,
88    config: ExecutorConfig,
89}
90
91/// Configuration for [`Executor`].
92#[derive(Debug, Clone)]
93pub struct ExecutorConfig {
94    /// The size of the sync queue, which holds task id's for cross-thread
95    /// wakes.
96    ///
97    /// This is fixed and will create backpressure when full.
98    pub sync_queue_size: usize,
99
100    /// The size of the local queues, which hold tasks for same-thread
101    /// execution.
102    ///
103    /// This is dynamically resized to avoid blocking.
104    pub local_queue_size: usize,
105
106    /// The maximum number of hot tasks to run in each tick.
107    pub max_interval: u32,
108
109    /// A waker to be waken when a task is scheduled from other thread.
110    ///
111    /// This is useful for waking up drivers that switch to kernel state when
112    /// idle.
113    ///
114    /// Enable `notify-always` feature to wake this waker on every schedule,
115    /// even if the executor is already awake.
116    pub waker: Option<Waker>,
117}
118
119impl Default for ExecutorConfig {
120    fn default() -> Self {
121        Self {
122            sync_queue_size: 64,
123            local_queue_size: 64,
124            max_interval: 61,
125            waker: None,
126        }
127    }
128}
129
130pub(crate) struct Shared {
131    waker: Option<Waker>,
132    sync: ArrayQueue<TaskId>,
133    queue: SendWrapper<TaskQueue>,
134}
135
136impl Executor {
137    /// Create a new executor.
138    pub fn new() -> Self {
139        Self::with_config(ExecutorConfig::default())
140    }
141
142    /// Create a new executor with config.
143    pub fn with_config(mut config: ExecutorConfig) -> Self {
144        let ptr = Box::into_raw(Box::new(Shared {
145            waker: config.waker.take(),
146            sync: ArrayQueue::new(config.sync_queue_size),
147            queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
148        }));
149
150        Self {
151            config,
152            ptr: unsafe { NonNull::new_unchecked(ptr) },
153        }
154    }
155
156    /// Spawn a future onto the executor.
157    pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
158        let shared = self.shared();
159        let tracker = shared.queue.tracker();
160        // SAFETY: Executor cannot be sent to ther thread
161        let queue = unsafe { shared.queue.get_unchecked() };
162        let task = queue.insert(self.ptr, tracker, fut);
163
164        JoinHandle::new(task)
165    }
166
167    /// Retrieve all sync tasks, schedule those to the tail of `hot` queue
168    /// and run at most [`max_interval`] tasks.
169    ///
170    /// Running start with `hot` tasks, then `cold` ones. Finished tasks will
171    /// be pushed back to tail of `cold` queue.
172    ///
173    /// Return whether there are still hot tasks after the tick.
174    ///
175    /// [`max_interval`]: ExecutorConfig::max_interval
176    pub fn tick(&self) -> bool {
177        let queue = self.queue();
178
179        while let Some(id) = self.shared().sync.pop() {
180            queue.make_hot(id);
181        }
182
183        for id in queue.iter_hot().take(self.config.max_interval as _) {
184            queue.make_cold(id);
185            let task = queue.take(id).expect("Task was not reset back");
186            let res = unsafe { task.run() };
187            if res.is_ready() {
188                // SAFETY: We're removing it soon, so drop will only be called once.
189                // The shared pointer is kept valid until the Executor is dropped,
190                // to avoid use-after-free issues with concurrent wakers.
191                unsafe { task.drop() };
192                queue.remove(id);
193            } else {
194                queue.reset(id, task);
195            }
196        }
197
198        queue.has_hot()
199    }
200
201    /// Check if there's still scheduled task that needs to be ran.
202    #[doc(hidden)]
203    pub fn has_task(&self) -> bool {
204        self.queue().hot_head().is_some()
205    }
206
207    /// Clear the executor, drop all tasks.
208    ///
209    /// This should be called only in context of the runtime, if any future may
210    /// use it. Any panic happened during dropping the future will cause the
211    /// process to abort. If this was not called before dropping, all tasks will
212    /// be leakded.
213    pub fn clear(&self) {
214        instrument!(compio_log::Level::TRACE, "Executor::drop");
215        trace!("Dropping Executor");
216
217        while self.shared().sync.pop().is_some() {}
218        unsafe { self.queue().clear() };
219    }
220
221    #[inline(always)]
222    fn shared(&self) -> &Shared {
223        unsafe { self.ptr.as_ref() }
224    }
225
226    #[inline(always)]
227    fn queue(&self) -> &TaskQueue {
228        // SAFETY: Executor is single threaded
229        unsafe { self.shared().queue.get_unchecked() }
230    }
231}
232
233impl Drop for Executor {
234    fn drop(&mut self) {
235        self.clear();
236        unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
237    }
238}
239
240impl Default for Executor {
241    fn default() -> Self {
242        Self::new()
243    }
244}