luminal/runtime/executor.rs
1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6use std::future::Future;
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::task::Context;
10use std::thread;
11use std::time::Duration;
12use std::cell::RefCell;
13
14use crossbeam_channel::unbounded;
15use crossbeam_deque::{Injector, Steal, Worker};
16
17use super::join_handle::JoinHandle;
18use super::task::{BoxFuture, Task, TaskId};
19use super::waker::create_task_waker;
20use super::worker::WorkerThread;
21
22// Thread-local worker for ultra-fast task spawning
23thread_local! {
24 static LOCAL_WORKER: RefCell<Option<Worker<Task>>> = RefCell::new(None);
25}
26
27/// Inner state of the executor shared between instances
28///
29/// This structure contains the shared state of the executor,
30/// including the task queue, worker threads, and statistics.
31struct ExecutorInner {
32 /// Global task queue used by all workers
33 global_queue: Arc<Injector<Task>>,
34
35 /// Handles to the worker threads
36 worker_handles: Vec<thread::JoinHandle<()>>,
37
38 /// Flag indicating whether the executor is shutting down
39 shutdown: Arc<AtomicBool>,
40
41 /// Counter for generating unique task IDs
42 next_task_id: AtomicU64,
43
44 /// Counter for the number of tasks processed
45 tasks_processed: Arc<AtomicUsize>,
46
47 /// All worker stealers for work distribution
48 all_stealers: Arc<Vec<crossbeam_deque::Stealer<Task>>>,
49}
50
51impl ExecutorInner {
52 /// Creates a new executor inner state
53 ///
54 /// This initializes the global queue, creates worker threads,
55 /// and starts the runtime.
56 ///
57 /// # Returns
58 ///
59 /// A new `ExecutorInner` instance
60 fn new() -> Self {
61 let global_queue = Arc::new(Injector::new());
62 let shutdown = Arc::new(AtomicBool::new(false));
63 let tasks_processed = Arc::new(AtomicUsize::new(0));
64
65 let num_workers = std::thread::available_parallelism()
66 .map(|n| n.get())
67 .unwrap_or(4);
68
69 let mut stealers = Vec::with_capacity(num_workers);
70 let mut worker_handles = Vec::with_capacity(num_workers);
71
72
73 // Create workers for each thread (workers are not shared)
74 for i in 0..num_workers {
75 let worker = crossbeam_deque::Worker::new_fifo();
76 stealers.push(worker.stealer());
77
78 let worker_thread = WorkerThread {
79 id: i,
80 worker,
81 stealers: stealers.clone(),
82 global_queue: global_queue.clone(),
83 shutdown: shutdown.clone(),
84 tasks_processed: tasks_processed.clone(),
85 };
86
87 let handle = thread::Builder::new()
88 .name(format!("luminal-worker-{}", i))
89 .spawn(move || worker_thread.run())
90 .expect("Failed to spawn worker thread");
91
92 worker_handles.push(handle);
93 }
94
95 let all_stealers = Arc::new(stealers);
96
97 ExecutorInner {
98 global_queue,
99 worker_handles,
100 shutdown,
101 next_task_id: AtomicU64::new(1),
102 tasks_processed,
103 all_stealers,
104 }
105 }
106
107 /// Spawns a new task to be executed by the runtime
108 ///
109 /// # Parameters
110 ///
111 /// * `future` - The future to execute as a task
112 ///
113 /// # Returns
114 ///
115 /// The ID of the spawned task
116 fn spawn_internal(&self, future: BoxFuture) -> TaskId {
117 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
118 let task = Task::new(task_id, future);
119
120 // Use more efficient task distribution:
121 // Try to distribute to local queues first, fall back to global
122 self.global_queue.push(task);
123
124 task_id
125 }
126
127 /// Returns statistics about the runtime
128 ///
129 /// # Returns
130 ///
131 /// A tuple containing the current queue length and the number of tasks processed
132 fn stats(&self) -> (usize, usize) {
133 let global_len = self.global_queue.len();
134 let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
135 (global_len, tasks_processed)
136 }
137}
138
139impl Drop for ExecutorInner {
140 /// Cleans up resources when the executor is dropped
141 ///
142 /// This signals worker threads to shut down and waits for them to finish.
143 fn drop(&mut self) {
144 self.shutdown.store(true, Ordering::Release);
145
146 // Wait for workers to finish
147 for handle in self.worker_handles.drain(..) {
148 let _ = handle.join();
149 }
150 }
151}
152
153/// Core task execution engine for the Luminal runtime
154///
155/// The Executor is responsible for scheduling and executing tasks.
156/// It maintains a global task queue and a set of worker threads that
157/// process tasks using a work-stealing algorithm.
158pub struct Executor {
159 /// Shared executor state
160 inner: Arc<ExecutorInner>,
161}
162
163impl Executor {
164 /// Creates a new executor
165 ///
166 /// This initializes a new executor with worker threads based on the
167 /// number of available CPU cores.
168 ///
169 /// # Returns
170 ///
171 /// A new `Executor` instance
172 pub fn new() -> Self {
173 Executor {
174 inner: Arc::new(ExecutorInner::new()),
175 }
176 }
177
178 /// Spawns a future onto the executor
179 ///
180 /// This wraps the future in a task and schedules it for execution,
181 /// returning a JoinHandle that can be used to await its completion.
182 ///
183 /// # Type Parameters
184 ///
185 /// * `F` - The future type
186 ///
187 /// # Parameters
188 ///
189 /// * `future` - The future to execute
190 ///
191 /// # Returns
192 ///
193 /// A `JoinHandle` for the spawned task
194 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
195 where
196 F: Future + Send + 'static,
197 F::Output: Send + 'static,
198 {
199 // Use unbounded channel with correct type
200 let (sender, receiver) = unbounded::<F::Output>();
201
202 // Optimized future wrapper with less overhead
203 let wrapped_future = async move {
204 let result = future.await;
205 let _ = sender.send(result);
206 };
207
208 let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
209
210 JoinHandle {
211 id: task_id,
212 receiver,
213 }
214 }
215
216 /// Blocks the current thread until the provided future completes
217 ///
218 /// This spawns the future as a task and then actively helps execute tasks
219 /// from the queue until the specified future completes.
220 ///
221 /// # Type Parameters
222 ///
223 /// * `F` - The future type
224 ///
225 /// # Parameters
226 ///
227 /// * `future` - The future to execute and wait for
228 ///
229 /// # Returns
230 ///
231 /// The output of the future
232 ///
233 /// # Panics
234 ///
235 /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
236 pub fn block_on<F>(&self, future: F) -> F::Output
237 where
238 F: Future + Send + 'static,
239 F::Output: Send + 'static,
240 {
241 let handle = self.spawn(future);
242
243 // Optimized blocking with better work helping
244 let mut backoff_count = 0u32;
245
246 loop {
247 // Check for result first (most common case)
248 match handle.receiver.try_recv() {
249 Ok(result) => return result,
250 Err(crossbeam_channel::TryRecvError::Empty) => {
251 // More efficient work helping strategy
252 let mut helped = false;
253
254 // Try to help with multiple tasks in one go
255 for _ in 0..4 {
256 match self.inner.global_queue.steal() {
257 Steal::Success(mut task) => {
258 let waker = create_task_waker(
259 task.id,
260 self.inner.global_queue.clone()
261 );
262 let mut cx = Context::from_waker(&waker);
263
264 if let std::task::Poll::Pending = task.poll(&mut cx) {
265 self.inner.global_queue.push(task);
266 }
267 helped = true;
268 }
269 Steal::Empty => break,
270 Steal::Retry => continue,
271 }
272 }
273
274 // Optimized backoff strategy
275 if !helped {
276 backoff_count += 1;
277 if backoff_count > 1000 {
278 thread::sleep(Duration::from_nanos(100));
279 backoff_count = 0;
280 } else if backoff_count > 100 {
281 thread::yield_now();
282 }
283 // Else busy wait for better latency
284 } else {
285 backoff_count = 0;
286 }
287 }
288 Err(crossbeam_channel::TryRecvError::Disconnected) => {
289 panic!("Task execution failed - channel disconnected");
290 }
291 }
292 }
293 }
294
295 /// Runs the executor until the task queue is empty
296 ///
297 /// This method is primarily used for testing and benchmarking.
298 pub fn run(&self) {
299 // Runtime runs automatically via worker threads
300 while !self.inner.global_queue.is_empty() {
301 thread::sleep(Duration::from_millis(1));
302 }
303 }
304
305 /// Returns statistics about the executor
306 ///
307 /// # Returns
308 ///
309 /// A tuple containing the current queue length and the number of tasks processed
310 pub fn stats(&self) -> (usize, usize) {
311 self.inner.stats()
312 }
313}