1#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_if::cfg_if! {
31 if #[cfg(loom)] {
32 use loom::cell::UnsafeCell;
33 use loom::hint;
34 use loom::thread::yield_now;
35 use loom::sync::atomic::*;
36 } else {
37 use std::hint;
38 use std::thread::yield_now;
39 use std::sync::atomic::*;
40
41 #[repr(transparent)]
42 struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
43
44 impl<T> UnsafeCell<T> {
45 pub fn new(value: T) -> Self {
46 Self(std::cell::UnsafeCell::new(value))
47 }
48
49 #[inline(always)]
50 pub fn with_mut<F, R>(&self, f: F) -> R
51 where
52 F: FnOnce(*mut T) -> R,
53 {
54 f(self.0.get())
55 }
56
57 #[inline(always)]
58 pub fn with<F, R>(&self, f: F) -> R
59 where
60 F: FnOnce(*const T) -> R,
61 {
62 f(self.0.get())
63 }
64 }
65 }
66}
67
68pub(crate) type PanicResult<T> = Result<T, Panic>;
69pub(crate) type Panic = Box<dyn Any + Send + 'static>;
70
71#[derive(Debug)]
86pub struct Executor {
87 ptr: NonNull<Shared>,
88 config: ExecutorConfig,
89}
90
91#[derive(Debug, Clone)]
93pub struct ExecutorConfig {
94 pub sync_queue_size: usize,
99
100 pub local_queue_size: usize,
105
106 pub max_interval: u32,
108
109 pub waker: Option<Waker>,
117}
118
119impl Default for ExecutorConfig {
120 fn default() -> Self {
121 Self {
122 sync_queue_size: 64,
123 local_queue_size: 64,
124 max_interval: 61,
125 waker: None,
126 }
127 }
128}
129
130pub(crate) struct Shared {
131 waker: Option<Waker>,
132 sync: ArrayQueue<TaskId>,
133 queue: SendWrapper<TaskQueue>,
134}
135
136impl Executor {
137 pub fn new() -> Self {
139 Self::with_config(ExecutorConfig::default())
140 }
141
142 pub fn with_config(mut config: ExecutorConfig) -> Self {
144 let ptr = Box::into_raw(Box::new(Shared {
145 waker: config.waker.take(),
146 sync: ArrayQueue::new(config.sync_queue_size),
147 queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
148 }));
149
150 Self {
151 config,
152 ptr: unsafe { NonNull::new_unchecked(ptr) },
153 }
154 }
155
156 pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
158 let shared = self.shared();
159 let tracker = shared.queue.tracker();
160 let queue = unsafe { shared.queue.get_unchecked() };
162 let task = queue.insert(self.ptr, tracker, fut);
163
164 JoinHandle::new(task)
165 }
166
167 pub fn tick(&self) -> bool {
177 let queue = self.queue();
178
179 while let Some(id) = self.shared().sync.pop() {
180 queue.make_hot(id);
181 }
182
183 for id in queue.iter_hot().take(self.config.max_interval as _) {
184 queue.make_cold(id);
185 let task = queue.take(id).expect("Task was not reset back");
186 let res = unsafe { task.run() };
187 if res.is_ready() {
188 unsafe { task.drop() };
192 queue.remove(id);
193 } else {
194 queue.reset(id, task);
195 }
196 }
197
198 queue.has_hot()
199 }
200
201 #[doc(hidden)]
203 pub fn has_task(&self) -> bool {
204 self.queue().hot_head().is_some()
205 }
206
207 pub fn clear(&self) {
214 instrument!(compio_log::Level::TRACE, "Executor::drop");
215 trace!("Dropping Executor");
216
217 while self.shared().sync.pop().is_some() {}
218 unsafe { self.queue().clear() };
219 }
220
221 #[inline(always)]
222 fn shared(&self) -> &Shared {
223 unsafe { self.ptr.as_ref() }
224 }
225
226 #[inline(always)]
227 fn queue(&self) -> &TaskQueue {
228 unsafe { self.shared().queue.get_unchecked() }
230 }
231}
232
233impl Drop for Executor {
234 fn drop(&mut self) {
235 self.clear();
236 unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
237 }
238}
239
240impl Default for Executor {
241 fn default() -> Self {
242 Self::new()
243 }
244}