luminal/runtime/
executor.rs

1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6use std::future::Future;
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::task::Context;
10use std::thread;
11use std::time::{Duration, Instant};
12
13use crossbeam_channel::bounded;
14use crossbeam_deque::{Injector, Steal};
15
16use super::join_handle::JoinHandle;
17use super::task::{BoxFuture, Task, TaskId};
18use super::waker::create_task_waker;
19use super::worker::WorkerThread;
20
21/// Inner state of the executor shared between instances
22///
23/// This structure contains the shared state of the executor,
24/// including the task queue, worker threads, and statistics.
25struct ExecutorInner {
26    /// Global task queue used by all workers
27    global_queue: Arc<Injector<Task>>,
28    
29    /// Handles to the worker threads
30    worker_handles: Vec<thread::JoinHandle<()>>,
31    
32    /// Flag indicating whether the executor is shutting down
33    shutdown: Arc<AtomicBool>,
34    
35    /// Counter for generating unique task IDs
36    next_task_id: AtomicU64,
37    
38    /// Counter for the number of tasks processed
39    tasks_processed: Arc<AtomicUsize>,
40}
41
42impl ExecutorInner {
43    /// Creates a new executor inner state
44    ///
45    /// This initializes the global queue, creates worker threads,
46    /// and starts the runtime.
47    ///
48    /// # Returns
49    ///
50    /// A new `ExecutorInner` instance
51    fn new() -> Self {
52        let global_queue = Arc::new(Injector::new());
53        let shutdown = Arc::new(AtomicBool::new(false));
54        let tasks_processed = Arc::new(AtomicUsize::new(0));
55        
56        let num_workers = std::thread::available_parallelism()
57            .map(|n| n.get())
58            .unwrap_or(4);
59        
60        let mut stealers = Vec::with_capacity(num_workers);
61        let mut worker_handles = Vec::with_capacity(num_workers);
62        
63        // Create workers for each thread (workers are not shared)
64        for i in 0..num_workers {
65            let worker = crossbeam_deque::Worker::new_fifo();
66            stealers.push(worker.stealer());
67            
68            let worker_thread = WorkerThread {
69                id: i,
70                worker,
71                stealers: stealers.clone(),
72                global_queue: global_queue.clone(),
73                shutdown: shutdown.clone(),
74                tasks_processed: tasks_processed.clone(),
75            };
76            
77            let handle = thread::Builder::new()
78                .name(format!("luminal-worker-{}", i))
79                .spawn(move || worker_thread.run())
80                .expect("Failed to spawn worker thread");
81            
82            worker_handles.push(handle);
83        }
84        
85        ExecutorInner {
86            global_queue,
87            worker_handles,
88            shutdown,
89            next_task_id: AtomicU64::new(1),
90            tasks_processed,
91        }
92    }
93    
94    /// Spawns a new task to be executed by the runtime
95    ///
96    /// # Parameters
97    ///
98    /// * `future` - The future to execute as a task
99    ///
100    /// # Returns
101    ///
102    /// The ID of the spawned task
103    fn spawn_internal(&self, future: BoxFuture) -> TaskId {
104        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
105        let task = Task::new(task_id, future);
106        
107        // Always use global queue for task distribution
108        self.global_queue.push(task);
109        
110        task_id
111    }
112    
113    /// Returns statistics about the runtime
114    ///
115    /// # Returns
116    ///
117    /// A tuple containing the current queue length and the number of tasks processed
118    fn stats(&self) -> (usize, usize) {
119        let global_len = self.global_queue.len();
120        let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
121        (global_len, tasks_processed)
122    }
123}
124
125impl Drop for ExecutorInner {
126    /// Cleans up resources when the executor is dropped
127    ///
128    /// This signals worker threads to shut down and waits for them to finish.
129    fn drop(&mut self) {
130        self.shutdown.store(true, Ordering::Release);
131        
132        // Wait for workers to finish
133        for handle in self.worker_handles.drain(..) {
134            let _ = handle.join();
135        }
136    }
137}
138
139/// Core task execution engine for the Luminal runtime
140///
141/// The Executor is responsible for scheduling and executing tasks.
142/// It maintains a global task queue and a set of worker threads that
143/// process tasks using a work-stealing algorithm.
144pub struct Executor {
145    /// Shared executor state
146    inner: Arc<ExecutorInner>,
147}
148
149impl Executor {
150    /// Creates a new executor
151    ///
152    /// This initializes a new executor with worker threads based on the
153    /// number of available CPU cores.
154    ///
155    /// # Returns
156    ///
157    /// A new `Executor` instance
158    pub fn new() -> Self {
159        Executor {
160            inner: Arc::new(ExecutorInner::new()),
161        }
162    }
163
164    /// Spawns a future onto the executor
165    ///
166    /// This wraps the future in a task and schedules it for execution,
167    /// returning a JoinHandle that can be used to await its completion.
168    ///
169    /// # Type Parameters
170    ///
171    /// * `F` - The future type
172    ///
173    /// # Parameters
174    ///
175    /// * `future` - The future to execute
176    ///
177    /// # Returns
178    ///
179    /// A `JoinHandle` for the spawned task
180    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
181    where
182        F: Future + Send + 'static,
183        F::Output: Send + 'static,
184    {
185        let (sender, receiver) = bounded(1);
186        
187        // We need to use a different approach for panic handling in async code
188        let wrapped_future = async move {
189            // Create a future that will be polled within a panic handler
190            let result = future.await;
191            
192            // Always try to send the result, even if the receiver is dropped
193            // This prevents tasks from being stuck if their results aren't needed
194            let _ = sender.send(result);
195        };
196        
197        // Spawn with panic handling wrapper
198        let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
199        
200        JoinHandle {
201            id: task_id,
202            receiver,
203        }
204    }
205
206    /// Blocks the current thread until the provided future completes
207    ///
208    /// This spawns the future as a task and then actively helps execute tasks
209    /// from the queue until the specified future completes.
210    ///
211    /// # Type Parameters
212    ///
213    /// * `F` - The future type
214    ///
215    /// # Parameters
216    ///
217    /// * `future` - The future to execute and wait for
218    ///
219    /// # Returns
220    ///
221    /// The output of the future
222    ///
223    /// # Panics
224    ///
225    /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
226    pub fn block_on<F>(&self, future: F) -> F::Output
227    where
228        F: Future + Send + 'static,
229        F::Output: Send + 'static,
230    {
231        let handle = self.spawn(future);
232        
233        // Help with work while waiting for result
234        let start = Instant::now();
235        let mut spin_count = 0u32;
236        
237        loop {
238            match handle.receiver.try_recv() {
239                Ok(result) => return result,
240                Err(crossbeam_channel::TryRecvError::Empty) => {
241                    // Help process work to avoid deadlocks
242                    match self.inner.global_queue.steal() {
243                        Steal::Success(mut task) => {
244                            let waker = create_task_waker(
245                                task.id, 
246                                self.inner.global_queue.clone()
247                            );
248                            let mut cx = Context::from_waker(&waker);
249
250                            if let std::task::Poll::Pending = task.poll(&mut cx) {
251                                self.inner.global_queue.push(task);
252                            }
253                            spin_count = 0;
254                        }
255                        Steal::Empty => {
256                            spin_count += 1;
257                            if spin_count > 1000 {
258                                thread::yield_now();
259                                spin_count = 0;
260                            }
261
262                            // Timeout protection
263                            if start.elapsed() > Duration::from_secs(30) {
264                                panic!("Task timed out after 30 seconds");
265                            }
266                        }
267                        Steal::Retry => {
268                            // Optionally handle retry, here we just yield
269                            thread::yield_now();
270                        }
271                    }
272                }
273                Err(crossbeam_channel::TryRecvError::Disconnected) => {
274                    // Channel closed: this happens if task panicked or was dropped
275                    // Instead of panicking, handle gracefully
276                    eprintln!("ERROR: Task channel disconnected - possible causes:");
277                    eprintln!("  1. Task panicked during execution");
278                    eprintln!("  2. Worker thread terminated unexpectedly");
279                    eprintln!("  3. Memory corruption or use-after-free in task execution");
280                    eprintln!("Try adding more yields in CPU-intensive tasks with .await points");
281                    
282                    // For debugging, we'll exit with a specific code to identify this error
283                    std::process::exit(101);
284                }
285            }
286        }
287    }
288
289    /// Runs the executor until the task queue is empty
290    ///
291    /// This method is primarily used for testing and benchmarking.
292    pub fn run(&self) {
293        // Runtime runs automatically via worker threads
294        while !self.inner.global_queue.is_empty() {
295            thread::sleep(Duration::from_millis(1));
296        }
297    }
298    
299    /// Returns statistics about the executor
300    ///
301    /// # Returns
302    ///
303    /// A tuple containing the current queue length and the number of tasks processed
304    pub fn stats(&self) -> (usize, usize) {
305        self.inner.stats()
306    }
307}