luminal/runtime/
executor.rs

1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6use std::future::Future;
7use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::task::Context;
10use std::thread;
11use std::time::Duration;
12use std::cell::RefCell;
13
14use crossbeam_channel::unbounded;
15use crossbeam_deque::{Injector, Steal, Worker};
16
17use super::join_handle::JoinHandle;
18use super::task::{BoxFuture, Task, TaskId};
19use super::waker::create_task_waker;
20use super::worker::WorkerThread;
21
22// Thread-local worker for ultra-fast task spawning
23thread_local! {
24    static LOCAL_WORKER: RefCell<Option<Worker<Task>>> = RefCell::new(None);
25}
26
27/// Inner state of the executor shared between instances
28///
29/// This structure contains the shared state of the executor,
30/// including the task queue, worker threads, and statistics.
31struct ExecutorInner {
32    /// Global task queue used by all workers
33    global_queue: Arc<Injector<Task>>,
34    
35    /// Handles to the worker threads
36    worker_handles: Vec<thread::JoinHandle<()>>,
37    
38    /// Flag indicating whether the executor is shutting down
39    shutdown: Arc<AtomicBool>,
40    
41    /// Counter for generating unique task IDs
42    next_task_id: AtomicU64,
43    
44    /// Counter for the number of tasks processed
45    tasks_processed: Arc<AtomicUsize>,
46    
47    /// All worker stealers for work distribution
48    all_stealers: Arc<Vec<crossbeam_deque::Stealer<Task>>>,
49}
50
51impl ExecutorInner {
52    /// Creates a new executor inner state
53    ///
54    /// This initializes the global queue, creates worker threads,
55    /// and starts the runtime.
56    ///
57    /// # Returns
58    ///
59    /// A new `ExecutorInner` instance
60    fn new() -> Self {
61        let global_queue = Arc::new(Injector::new());
62        let shutdown = Arc::new(AtomicBool::new(false));
63        let tasks_processed = Arc::new(AtomicUsize::new(0));
64        
65        let num_workers = std::thread::available_parallelism()
66            .map(|n| n.get())
67            .unwrap_or(4);
68        
69        let mut stealers = Vec::with_capacity(num_workers);
70        let mut worker_handles = Vec::with_capacity(num_workers);
71        
72        
73        // Create workers for each thread (workers are not shared)
74        for i in 0..num_workers {
75            let worker = crossbeam_deque::Worker::new_fifo();
76            stealers.push(worker.stealer());
77            
78            let worker_thread = WorkerThread {
79                id: i,
80                worker,
81                stealers: stealers.clone(),
82                global_queue: global_queue.clone(),
83                shutdown: shutdown.clone(),
84                tasks_processed: tasks_processed.clone(),
85            };
86            
87            let handle = thread::Builder::new()
88                .name(format!("luminal-worker-{}", i))
89                .spawn(move || worker_thread.run())
90                .expect("Failed to spawn worker thread");
91            
92            worker_handles.push(handle);
93        }
94        
95        let all_stealers = Arc::new(stealers);
96        
97        ExecutorInner {
98            global_queue,
99            worker_handles,
100            shutdown,
101            next_task_id: AtomicU64::new(1),
102            tasks_processed,
103            all_stealers,
104        }
105    }
106    
107    /// Spawns a new task to be executed by the runtime
108    ///
109    /// # Parameters
110    ///
111    /// * `future` - The future to execute as a task
112    ///
113    /// # Returns
114    ///
115    /// The ID of the spawned task
116    fn spawn_internal(&self, future: BoxFuture) -> TaskId {
117        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
118        let task = Task::new(task_id, future);
119        
120        // Use more efficient task distribution:
121        // Try to distribute to local queues first, fall back to global
122        self.global_queue.push(task);
123        
124        task_id
125    }
126    
127    /// Returns statistics about the runtime
128    ///
129    /// # Returns
130    ///
131    /// A tuple containing the current queue length and the number of tasks processed
132    fn stats(&self) -> (usize, usize) {
133        let global_len = self.global_queue.len();
134        let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
135        (global_len, tasks_processed)
136    }
137}
138
139impl Drop for ExecutorInner {
140    /// Cleans up resources when the executor is dropped
141    ///
142    /// This signals worker threads to shut down and waits for them to finish.
143    fn drop(&mut self) {
144        self.shutdown.store(true, Ordering::Release);
145        
146        // Wait for workers to finish
147        for handle in self.worker_handles.drain(..) {
148            let _ = handle.join();
149        }
150    }
151}
152
153/// Core task execution engine for the Luminal runtime
154///
155/// The Executor is responsible for scheduling and executing tasks.
156/// It maintains a global task queue and a set of worker threads that
157/// process tasks using a work-stealing algorithm.
158pub struct Executor {
159    /// Shared executor state
160    inner: Arc<ExecutorInner>,
161}
162
163impl Executor {
164    /// Creates a new executor
165    ///
166    /// This initializes a new executor with worker threads based on the
167    /// number of available CPU cores.
168    ///
169    /// # Returns
170    ///
171    /// A new `Executor` instance
172    pub fn new() -> Self {
173        Executor {
174            inner: Arc::new(ExecutorInner::new()),
175        }
176    }
177
178    /// Spawns a future onto the executor
179    ///
180    /// This wraps the future in a task and schedules it for execution,
181    /// returning a JoinHandle that can be used to await its completion.
182    ///
183    /// # Type Parameters
184    ///
185    /// * `F` - The future type
186    ///
187    /// # Parameters
188    ///
189    /// * `future` - The future to execute
190    ///
191    /// # Returns
192    ///
193    /// A `JoinHandle` for the spawned task
194    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
195    where
196        F: Future + Send + 'static,
197        F::Output: Send + 'static,
198    {
199        // Use unbounded channel with correct type
200        let (sender, receiver) = unbounded::<F::Output>();
201        
202        // Optimized future wrapper with less overhead
203        let wrapped_future = async move {
204            let result = future.await;
205            let _ = sender.send(result);
206        };
207        
208        let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
209        
210        JoinHandle {
211            id: task_id,
212            receiver,
213        }
214    }
215
216    /// Blocks the current thread until the provided future completes
217    ///
218    /// This spawns the future as a task and then actively helps execute tasks
219    /// from the queue until the specified future completes.
220    ///
221    /// # Type Parameters
222    ///
223    /// * `F` - The future type
224    ///
225    /// # Parameters
226    ///
227    /// * `future` - The future to execute and wait for
228    ///
229    /// # Returns
230    ///
231    /// The output of the future
232    ///
233    /// # Panics
234    ///
235    /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
236    pub fn block_on<F>(&self, future: F) -> F::Output
237    where
238        F: Future + Send + 'static,
239        F::Output: Send + 'static,
240    {
241        let handle = self.spawn(future);
242        
243        // Optimized blocking with better work helping
244        let mut backoff_count = 0u32;
245        
246        loop {
247            // Check for result first (most common case)
248            match handle.receiver.try_recv() {
249                Ok(result) => return result,
250                Err(crossbeam_channel::TryRecvError::Empty) => {
251                    // More efficient work helping strategy
252                    let mut helped = false;
253                    
254                    // Try to help with multiple tasks in one go
255                    for _ in 0..4 {
256                        match self.inner.global_queue.steal() {
257                            Steal::Success(mut task) => {
258                                let waker = create_task_waker(
259                                    task.id, 
260                                    self.inner.global_queue.clone()
261                                );
262                                let mut cx = Context::from_waker(&waker);
263
264                                if let std::task::Poll::Pending = task.poll(&mut cx) {
265                                    self.inner.global_queue.push(task);
266                                }
267                                helped = true;
268                            }
269                            Steal::Empty => break,
270                            Steal::Retry => continue,
271                        }
272                    }
273                    
274                    // Optimized backoff strategy
275                    if !helped {
276                        backoff_count += 1;
277                        if backoff_count > 1000 {
278                            thread::sleep(Duration::from_nanos(100));
279                            backoff_count = 0;
280                        } else if backoff_count > 100 {
281                            thread::yield_now();
282                        }
283                        // Else busy wait for better latency
284                    } else {
285                        backoff_count = 0;
286                    }
287                }
288                Err(crossbeam_channel::TryRecvError::Disconnected) => {
289                    panic!("Task execution failed - channel disconnected");
290                }
291            }
292        }
293    }
294
295    /// Runs the executor until the task queue is empty
296    ///
297    /// This method is primarily used for testing and benchmarking.
298    pub fn run(&self) {
299        // Runtime runs automatically via worker threads
300        while !self.inner.global_queue.is_empty() {
301            thread::sleep(Duration::from_millis(1));
302        }
303    }
304    
305    /// Returns statistics about the executor
306    ///
307    /// # Returns
308    ///
309    /// A tuple containing the current queue length and the number of tasks processed
310    pub fn stats(&self) -> (usize, usize) {
311        self.inner.stats()
312    }
313}