luminal/runtime/executor.rs
1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6use std::future::Future;
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::task::Context;
10use std::thread;
11use std::time::{Duration, Instant};
12
13use crossbeam_channel::bounded;
14use crossbeam_deque::{Injector, Steal};
15
16use super::join_handle::JoinHandle;
17use super::task::{BoxFuture, Task, TaskId};
18use super::waker::create_task_waker;
19use super::worker::WorkerThread;
20
21/// Inner state of the executor shared between instances
22///
23/// This structure contains the shared state of the executor,
24/// including the task queue, worker threads, and statistics.
25struct ExecutorInner {
26 /// Global task queue used by all workers
27 global_queue: Arc<Injector<Task>>,
28
29 /// Handles to the worker threads
30 worker_handles: Vec<thread::JoinHandle<()>>,
31
32 /// Flag indicating whether the executor is shutting down
33 shutdown: Arc<AtomicBool>,
34
35 /// Counter for generating unique task IDs
36 next_task_id: AtomicU64,
37
38 /// Counter for the number of tasks processed
39 tasks_processed: Arc<AtomicUsize>,
40}
41
42impl ExecutorInner {
43 /// Creates a new executor inner state
44 ///
45 /// This initializes the global queue, creates worker threads,
46 /// and starts the runtime.
47 ///
48 /// # Returns
49 ///
50 /// A new `ExecutorInner` instance
51 fn new() -> Self {
52 let global_queue = Arc::new(Injector::new());
53 let shutdown = Arc::new(AtomicBool::new(false));
54 let tasks_processed = Arc::new(AtomicUsize::new(0));
55
56 let num_workers = std::thread::available_parallelism()
57 .map(|n| n.get())
58 .unwrap_or(4);
59
60 let mut stealers = Vec::with_capacity(num_workers);
61 let mut worker_handles = Vec::with_capacity(num_workers);
62
63 // Create workers for each thread (workers are not shared)
64 for i in 0..num_workers {
65 let worker = crossbeam_deque::Worker::new_fifo();
66 stealers.push(worker.stealer());
67
68 let worker_thread = WorkerThread {
69 id: i,
70 worker,
71 stealers: stealers.clone(),
72 global_queue: global_queue.clone(),
73 shutdown: shutdown.clone(),
74 tasks_processed: tasks_processed.clone(),
75 };
76
77 let handle = thread::Builder::new()
78 .name(format!("luminal-worker-{}", i))
79 .spawn(move || worker_thread.run())
80 .expect("Failed to spawn worker thread");
81
82 worker_handles.push(handle);
83 }
84
85 ExecutorInner {
86 global_queue,
87 worker_handles,
88 shutdown,
89 next_task_id: AtomicU64::new(1),
90 tasks_processed,
91 }
92 }
93
94 /// Spawns a new task to be executed by the runtime
95 ///
96 /// # Parameters
97 ///
98 /// * `future` - The future to execute as a task
99 ///
100 /// # Returns
101 ///
102 /// The ID of the spawned task
103 fn spawn_internal(&self, future: BoxFuture) -> TaskId {
104 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
105 let task = Task::new(task_id, future);
106
107 // Always use global queue for task distribution
108 self.global_queue.push(task);
109
110 task_id
111 }
112
113 /// Returns statistics about the runtime
114 ///
115 /// # Returns
116 ///
117 /// A tuple containing the current queue length and the number of tasks processed
118 fn stats(&self) -> (usize, usize) {
119 let global_len = self.global_queue.len();
120 let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
121 (global_len, tasks_processed)
122 }
123}
124
125impl Drop for ExecutorInner {
126 /// Cleans up resources when the executor is dropped
127 ///
128 /// This signals worker threads to shut down and waits for them to finish.
129 fn drop(&mut self) {
130 self.shutdown.store(true, Ordering::Release);
131
132 // Wait for workers to finish
133 for handle in self.worker_handles.drain(..) {
134 let _ = handle.join();
135 }
136 }
137}
138
139/// Core task execution engine for the Luminal runtime
140///
141/// The Executor is responsible for scheduling and executing tasks.
142/// It maintains a global task queue and a set of worker threads that
143/// process tasks using a work-stealing algorithm.
144pub struct Executor {
145 /// Shared executor state
146 inner: Arc<ExecutorInner>,
147}
148
149impl Executor {
150 /// Creates a new executor
151 ///
152 /// This initializes a new executor with worker threads based on the
153 /// number of available CPU cores.
154 ///
155 /// # Returns
156 ///
157 /// A new `Executor` instance
158 pub fn new() -> Self {
159 Executor {
160 inner: Arc::new(ExecutorInner::new()),
161 }
162 }
163
164 /// Spawns a future onto the executor
165 ///
166 /// This wraps the future in a task and schedules it for execution,
167 /// returning a JoinHandle that can be used to await its completion.
168 ///
169 /// # Type Parameters
170 ///
171 /// * `F` - The future type
172 ///
173 /// # Parameters
174 ///
175 /// * `future` - The future to execute
176 ///
177 /// # Returns
178 ///
179 /// A `JoinHandle` for the spawned task
180 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
181 where
182 F: Future + Send + 'static,
183 F::Output: Send + 'static,
184 {
185 let (sender, receiver) = bounded(1);
186
187 // We need to use a different approach for panic handling in async code
188 let wrapped_future = async move {
189 // Create a future that will be polled within a panic handler
190 let result = future.await;
191
192 // Always try to send the result, even if the receiver is dropped
193 // This prevents tasks from being stuck if their results aren't needed
194 let _ = sender.send(result);
195 };
196
197 // Spawn with panic handling wrapper
198 let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
199
200 JoinHandle {
201 id: task_id,
202 receiver,
203 }
204 }
205
206 /// Blocks the current thread until the provided future completes
207 ///
208 /// This spawns the future as a task and then actively helps execute tasks
209 /// from the queue until the specified future completes.
210 ///
211 /// # Type Parameters
212 ///
213 /// * `F` - The future type
214 ///
215 /// # Parameters
216 ///
217 /// * `future` - The future to execute and wait for
218 ///
219 /// # Returns
220 ///
221 /// The output of the future
222 ///
223 /// # Panics
224 ///
225 /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
226 pub fn block_on<F>(&self, future: F) -> F::Output
227 where
228 F: Future + Send + 'static,
229 F::Output: Send + 'static,
230 {
231 let handle = self.spawn(future);
232
233 // Help with work while waiting for result
234 let start = Instant::now();
235 let mut spin_count = 0u32;
236
237 loop {
238 match handle.receiver.try_recv() {
239 Ok(result) => return result,
240 Err(crossbeam_channel::TryRecvError::Empty) => {
241 // Help process work to avoid deadlocks
242 match self.inner.global_queue.steal() {
243 Steal::Success(mut task) => {
244 let waker = create_task_waker(
245 task.id,
246 self.inner.global_queue.clone()
247 );
248 let mut cx = Context::from_waker(&waker);
249
250 if let std::task::Poll::Pending = task.poll(&mut cx) {
251 self.inner.global_queue.push(task);
252 }
253 spin_count = 0;
254 }
255 Steal::Empty => {
256 spin_count += 1;
257 if spin_count > 1000 {
258 thread::yield_now();
259 spin_count = 0;
260 }
261
262 // Timeout protection
263 if start.elapsed() > Duration::from_secs(30) {
264 panic!("Task timed out after 30 seconds");
265 }
266 }
267 Steal::Retry => {
268 // Optionally handle retry, here we just yield
269 thread::yield_now();
270 }
271 }
272 }
273 Err(crossbeam_channel::TryRecvError::Disconnected) => {
274 // Channel closed: this happens if task panicked or was dropped
275 // Instead of panicking, handle gracefully
276 eprintln!("ERROR: Task channel disconnected - possible causes:");
277 eprintln!(" 1. Task panicked during execution");
278 eprintln!(" 2. Worker thread terminated unexpectedly");
279 eprintln!(" 3. Memory corruption or use-after-free in task execution");
280 eprintln!("Try adding more yields in CPU-intensive tasks with .await points");
281
282 // For debugging, we'll exit with a specific code to identify this error
283 std::process::exit(101);
284 }
285 }
286 }
287 }
288
289 /// Runs the executor until the task queue is empty
290 ///
291 /// This method is primarily used for testing and benchmarking.
292 pub fn run(&self) {
293 // Runtime runs automatically via worker threads
294 while !self.inner.global_queue.is_empty() {
295 thread::sleep(Duration::from_millis(1));
296 }
297 }
298
299 /// Returns statistics about the executor
300 ///
301 /// # Returns
302 ///
303 /// A tuple containing the current queue length and the number of tasks processed
304 pub fn stats(&self) -> (usize, usize) {
305 self.inner.stats()
306 }
307}