luminal/runtime/executor.rs
1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6#[cfg(feature = "std")]
7use std::{future::Future, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::Arc, task::Context, thread, time::Duration, cell::RefCell};
8
9#[cfg(not(feature = "std"))]
10use core::{future::Future, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, task::Context, cell::RefCell};
11
12#[cfg(not(feature = "std"))]
13use alloc::{sync::Arc, vec::Vec, boxed::Box};
14
15#[cfg(feature = "std")]
16use crossbeam_channel::unbounded;
17
18#[cfg(not(feature = "std"))]
19use heapless::mpmc::MpMcQueue;
20use crossbeam_deque::{Injector, Steal, Worker};
21
22use super::join_handle::JoinHandle;
23use super::task::{BoxFuture, Task, TaskId};
24use super::waker::create_task_waker;
25use super::worker::WorkerThread;
26
27// Thread-local worker for ultra-fast task spawning (std only)
28#[cfg(feature = "std")]
29thread_local! {
30 static LOCAL_WORKER: RefCell<Option<Worker<Task>>> = RefCell::new(None);
31}
32
33/// Inner state of the executor shared between instances
34///
35/// This structure contains the shared state of the executor,
36/// including the task queue, worker threads, and statistics.
37#[cfg(feature = "std")]
38struct ExecutorInner {
39 /// Global task queue used by all workers
40 global_queue: Arc<Injector<Task>>,
41
42 /// Handles to the worker threads
43 worker_handles: Vec<thread::JoinHandle<()>>,
44
45 /// Flag indicating whether the executor is shutting down
46 shutdown: Arc<AtomicBool>,
47
48 /// Counter for generating unique task IDs
49 next_task_id: AtomicU64,
50
51 /// Counter for the number of tasks processed
52 tasks_processed: Arc<AtomicUsize>,
53
54 // // All worker stealers for work distribution
55 // all_stealers: Arc<Vec<crossbeam_deque::Stealer<Task>>>,
56}
57
58/// Inner state of the executor shared between instances (no_std version)
59///
60/// This structure contains the simplified shared state of the executor
61/// for no_std environments, without threading support.
62#[cfg(not(feature = "std"))]
63struct ExecutorInner {
64 /// Global task queue used by the single-threaded executor
65 global_queue: Arc<Injector<Task>>,
66
67 /// Counter for generating unique task IDs
68 next_task_id: AtomicU64,
69
70 /// Counter for the number of tasks processed
71 tasks_processed: Arc<AtomicUsize>,
72}
73
74impl ExecutorInner {
75 /// Creates a new executor inner state
76 ///
77 /// This initializes the global queue, creates worker threads,
78 /// and starts the runtime.
79 ///
80 /// # Returns
81 ///
82 /// A new `ExecutorInner` instance
83 fn new() -> Self {
84 let global_queue = Arc::new(Injector::new());
85 let shutdown = Arc::new(AtomicBool::new(false));
86 let tasks_processed = Arc::new(AtomicUsize::new(0));
87
88 let num_workers = std::thread::available_parallelism()
89 .map(|n| n.get())
90 .unwrap_or(4);
91
92 let mut stealers = Vec::with_capacity(num_workers);
93 let mut worker_handles = Vec::with_capacity(num_workers);
94
95
96 // Create workers for each thread (workers are not shared)
97 for i in 0..num_workers {
98 let worker = crossbeam_deque::Worker::new_fifo();
99 stealers.push(worker.stealer());
100
101 let worker_thread = WorkerThread {
102 id: i,
103 worker,
104 stealers: stealers.clone(),
105 global_queue: global_queue.clone(),
106 shutdown: shutdown.clone(),
107 tasks_processed: tasks_processed.clone(),
108 };
109
110 let handle = thread::Builder::new()
111 .name(format!("luminal-worker-{}", i))
112 .spawn(move || worker_thread.run())
113 .expect("Failed to spawn worker thread");
114
115 worker_handles.push(handle);
116 }
117
118 // let all_stealers = Arc::new(stealers);
119
120 ExecutorInner {
121 global_queue,
122 worker_handles,
123 shutdown,
124 next_task_id: AtomicU64::new(1),
125 tasks_processed,
126 // all_stealers,
127 }
128 }
129
130 /// Spawns a new task to be executed by the runtime
131 ///
132 /// # Parameters
133 ///
134 /// * `future` - The future to execute as a task
135 ///
136 /// # Returns
137 ///
138 /// The ID of the spawned task
139 fn spawn_internal(&self, future: BoxFuture) -> TaskId {
140 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
141 let task = Task::new(task_id, future);
142
143 // Use more efficient task distribution:
144 // Try to distribute to local queues first, fall back to global
145 self.global_queue.push(task);
146
147 task_id
148 }
149
150 /// Returns statistics about the runtime
151 ///
152 /// # Returns
153 ///
154 /// A tuple containing the current queue length and the number of tasks processed
155 fn stats(&self) -> (usize, usize) {
156 let global_len = self.global_queue.len();
157 let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
158 (global_len, tasks_processed)
159 }
160}
161
162impl Drop for ExecutorInner {
163 /// Cleans up resources when the executor is dropped
164 ///
165 /// This signals worker threads to shut down and waits for them to finish.
166 fn drop(&mut self) {
167 self.shutdown.store(true, Ordering::Release);
168
169 // Wait for workers to finish
170 for handle in self.worker_handles.drain(..) {
171 let _ = handle.join();
172 }
173 }
174}
175
176/// Core task execution engine for the Luminal runtime
177///
178/// The Executor is responsible for scheduling and executing tasks.
179/// It maintains a global task queue and a set of worker threads that
180/// process tasks using a work-stealing algorithm.
181pub struct Executor {
182 /// Shared executor state
183 inner: Arc<ExecutorInner>,
184}
185
186impl Executor {
187 /// Creates a new executor
188 ///
189 /// This initializes a new executor with worker threads based on the
190 /// number of available CPU cores.
191 ///
192 /// # Returns
193 ///
194 /// A new `Executor` instance
195 pub fn new() -> Self {
196 Executor {
197 inner: Arc::new(ExecutorInner::new()),
198 }
199 }
200
201 /// Spawns a future onto the executor
202 ///
203 /// This wraps the future in a task and schedules it for execution,
204 /// returning a JoinHandle that can be used to await its completion.
205 ///
206 /// # Type Parameters
207 ///
208 /// * `F` - The future type
209 ///
210 /// # Parameters
211 ///
212 /// * `future` - The future to execute
213 ///
214 /// # Returns
215 ///
216 /// A `JoinHandle` for the spawned task
217 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
218 where
219 F: Future + Send + 'static,
220 F::Output: Send + 'static,
221 {
222 // Use unbounded channel with correct type
223 let (sender, receiver) = unbounded::<F::Output>();
224
225 // Optimized future wrapper with less overhead
226 let wrapped_future = async move {
227 let result = future.await;
228 let _ = sender.send(result);
229 };
230
231 let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
232
233 JoinHandle {
234 id: task_id,
235 receiver,
236 }
237 }
238
239 /// Blocks the current thread until the provided future completes
240 ///
241 /// This spawns the future as a task and then actively helps execute tasks
242 /// from the queue until the specified future completes.
243 ///
244 /// # Type Parameters
245 ///
246 /// * `F` - The future type
247 ///
248 /// # Parameters
249 ///
250 /// * `future` - The future to execute and wait for
251 ///
252 /// # Returns
253 ///
254 /// The output of the future
255 ///
256 /// # Panics
257 ///
258 /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
259 pub fn block_on<F>(&self, future: F) -> F::Output
260 where
261 F: Future + Send + 'static,
262 F::Output: Send + 'static,
263 {
264 let handle = self.spawn(future);
265
266 // Optimized blocking with better work helping
267 let mut backoff_count = 0u32;
268
269 loop {
270 // Check for result first (most common case)
271 match handle.receiver.try_recv() {
272 Ok(result) => return result,
273 Err(crossbeam_channel::TryRecvError::Empty) => {
274 // More efficient work helping strategy
275 let mut helped = false;
276
277 // Try to help with multiple tasks in one go
278 for _ in 0..4 {
279 match self.inner.global_queue.steal() {
280 Steal::Success(mut task) => {
281 let waker = create_task_waker(
282 task.id,
283 self.inner.global_queue.clone()
284 );
285 let mut cx = Context::from_waker(&waker);
286
287 if let std::task::Poll::Pending = task.poll(&mut cx) {
288 self.inner.global_queue.push(task);
289 }
290 helped = true;
291 }
292 Steal::Empty => break,
293 Steal::Retry => continue,
294 }
295 }
296
297 // Optimized backoff strategy
298 if !helped {
299 backoff_count += 1;
300 if backoff_count > 1000 {
301 thread::sleep(Duration::from_nanos(100));
302 backoff_count = 0;
303 } else if backoff_count > 100 {
304 thread::yield_now();
305 }
306 // Else busy wait for better latency
307 } else {
308 backoff_count = 0;
309 }
310 }
311 Err(crossbeam_channel::TryRecvError::Disconnected) => {
312 panic!("Task execution failed - channel disconnected");
313 }
314 }
315 }
316 }
317
318 /// Runs the executor until the task queue is empty
319 ///
320 /// This method is primarily used for testing and benchmarking.
321 pub fn run(&self) {
322 // Runtime runs automatically via worker threads
323 while !self.inner.global_queue.is_empty() {
324 thread::sleep(Duration::from_millis(1));
325 }
326 }
327
328 /// Returns statistics about the executor
329 ///
330 /// # Returns
331 ///
332 /// A tuple containing the current queue length and the number of tasks processed
333 pub fn stats(&self) -> (usize, usize) {
334 self.inner.stats()
335 }
336}