1#![cfg_attr(docsrs, feature(doc_cfg))]
4#![allow(unused_features)]
5#![warn(missing_docs)]
6#![deny(rustdoc::broken_intra_doc_links)]
7#![doc(
8 html_logo_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
9)]
10#![doc(
11 html_favicon_url = "https://github.com/compio-rs/compio-logo/raw/refs/heads/master/generated/colored-bold.svg"
12)]
13
14use std::{any::Any, fmt::Debug, ptr::NonNull, task::Waker};
15
16use crate::queue::{TaskId, TaskQueue};
17
18mod join_handle;
19mod queue;
20mod task;
21mod util;
22mod waker;
23
24use compio_log::{instrument, trace};
25use compio_send_wrapper::SendWrapper;
26use crossbeam_queue::ArrayQueue;
27pub use join_handle::{JoinError, JoinHandle, ResumeUnwind};
28use util::panic_guard;
29
30cfg_if::cfg_if! {
31 if #[cfg(loom)] {
32 use loom::cell::UnsafeCell;
33 use loom::hint;
34 use loom::thread::yield_now;
35 use loom::sync::atomic::*;
36 } else {
37 use std::hint;
38 use std::thread::yield_now;
39 use std::sync::atomic::*;
40
41 #[repr(transparent)]
42 struct UnsafeCell<T>(std::cell::UnsafeCell<T>);
43
44 impl<T> UnsafeCell<T> {
45 pub fn new(value: T) -> Self {
46 Self(std::cell::UnsafeCell::new(value))
47 }
48
49 #[inline(always)]
50 pub fn with_mut<F, R>(&self, f: F) -> R
51 where
52 F: FnOnce(*mut T) -> R,
53 {
54 f(self.0.get())
55 }
56
57 #[inline(always)]
58 pub fn with<F, R>(&self, f: F) -> R
59 where
60 F: FnOnce(*const T) -> R,
61 {
62 f(self.0.get())
63 }
64 }
65 }
66}
67
68pub(crate) type PanicResult<T> = Result<T, Panic>;
69pub(crate) type Panic = Box<dyn Any + Send + 'static>;
70
71#[derive(Debug)]
86pub struct Executor {
87 ptr: NonNull<Shared>,
88 config: ExecutorConfig,
89}
90
91#[derive(Debug, Clone)]
93pub struct ExecutorConfig {
94 pub sync_queue_size: usize,
99
100 pub local_queue_size: usize,
105
106 pub max_interval: u32,
108
109 pub waker: Option<Waker>,
114}
115
116impl Default for ExecutorConfig {
117 fn default() -> Self {
118 Self {
119 sync_queue_size: 64,
120 local_queue_size: 64,
121 max_interval: 61,
122 waker: None,
123 }
124 }
125}
126
127pub(crate) struct Shared {
128 waker: Option<Waker>,
129 sync: ArrayQueue<TaskId>,
130 queue: SendWrapper<TaskQueue>,
131}
132
133impl Executor {
134 pub fn new() -> Self {
136 Self::with_config(ExecutorConfig::default())
137 }
138
139 pub fn with_config(mut config: ExecutorConfig) -> Self {
141 let ptr = Box::into_raw(Box::new(Shared {
142 waker: config.waker.take(),
143 sync: ArrayQueue::new(config.sync_queue_size),
144 queue: SendWrapper::new(TaskQueue::new(config.local_queue_size)),
145 }));
146
147 Self {
148 config,
149 ptr: unsafe { NonNull::new_unchecked(ptr) },
150 }
151 }
152
153 pub fn spawn<F: Future + 'static>(&self, fut: F) -> JoinHandle<F::Output> {
155 let shared = self.shared();
156 let tracker = shared.queue.tracker();
157 let queue = unsafe { shared.queue.get_unchecked() };
159 let task = queue.insert(self.ptr, tracker, fut);
160
161 JoinHandle::new(task)
162 }
163
164 pub fn tick(&self) -> bool {
174 let queue = self.queue();
175
176 while let Some(id) = self.shared().sync.pop() {
177 queue.make_hot(id);
178 }
179
180 for id in queue.iter_hot().take(self.config.max_interval as _) {
181 queue.make_cold(id);
182 let task = queue.take(id).expect("Task was not reset back");
183 let res = unsafe { task.run() };
184 if res.is_ready() {
185 unsafe { task.drop() };
189 queue.remove(id);
190 } else {
191 queue.reset(id, task);
192 }
193 }
194
195 queue.has_hot()
196 }
197
198 #[doc(hidden)]
200 pub fn has_task(&self) -> bool {
201 self.queue().hot_head().is_some()
202 }
203
204 pub fn clear(&self) {
211 instrument!(compio_log::Level::TRACE, "Executor::drop");
212 trace!("Dropping Executor");
213
214 while self.shared().sync.pop().is_some() {}
215 unsafe { self.queue().clear() };
216 }
217
218 #[inline(always)]
219 fn shared(&self) -> &Shared {
220 unsafe { self.ptr.as_ref() }
221 }
222
223 #[inline(always)]
224 fn queue(&self) -> &TaskQueue {
225 unsafe { self.shared().queue.get_unchecked() }
227 }
228}
229
230impl Drop for Executor {
231 fn drop(&mut self) {
232 self.clear();
233 unsafe { drop(Box::from_raw(self.ptr.as_ptr())) };
234 }
235}
236
237impl Default for Executor {
238 fn default() -> Self {
239 Self::new()
240 }
241}