luminal/runtime/
executor.rs

1//! Task execution engine
2//!
3//! This module provides the core executor implementation for the Luminal runtime.
4//! The executor is responsible for scheduling and executing tasks.
5
6#[cfg(feature = "std")]
7use std::{future::Future, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::Arc, task::Context, thread, time::Duration, cell::RefCell};
8
9#[cfg(not(feature = "std"))]
10use core::{future::Future, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, task::Context, cell::RefCell};
11
12#[cfg(not(feature = "std"))]
13use alloc::{sync::Arc, vec::Vec, boxed::Box};
14
15#[cfg(feature = "std")]
16use crossbeam_channel::unbounded;
17
18#[cfg(not(feature = "std"))]
19use heapless::mpmc::MpMcQueue;
20use crossbeam_deque::{Injector, Steal, Worker};
21
22use super::join_handle::JoinHandle;
23use super::task::{BoxFuture, Task, TaskId};
24use super::waker::create_task_waker;
25use super::worker::WorkerThread;
26
27// Thread-local worker for ultra-fast task spawning (std only)
28#[cfg(feature = "std")]
29thread_local! {
30    static LOCAL_WORKER: RefCell<Option<Worker<Task>>> = RefCell::new(None);
31}
32
33/// Inner state of the executor shared between instances
34///
35/// This structure contains the shared state of the executor,
36/// including the task queue, worker threads, and statistics.
37#[cfg(feature = "std")]
38struct ExecutorInner {
39    /// Global task queue used by all workers
40    global_queue: Arc<Injector<Task>>,
41
42    /// Handles to the worker threads
43    worker_handles: Vec<thread::JoinHandle<()>>,
44
45    /// Flag indicating whether the executor is shutting down
46    shutdown: Arc<AtomicBool>,
47
48    /// Counter for generating unique task IDs
49    next_task_id: AtomicU64,
50
51    /// Counter for the number of tasks processed
52    tasks_processed: Arc<AtomicUsize>,
53
54    // // All worker stealers for work distribution
55    // all_stealers: Arc<Vec<crossbeam_deque::Stealer<Task>>>,
56}
57
58/// Inner state of the executor shared between instances (no_std version)
59///
60/// This structure contains the simplified shared state of the executor
61/// for no_std environments, without threading support.
62#[cfg(not(feature = "std"))]
63struct ExecutorInner {
64    /// Global task queue used by the single-threaded executor
65    global_queue: Arc<Injector<Task>>,
66
67    /// Counter for generating unique task IDs
68    next_task_id: AtomicU64,
69
70    /// Counter for the number of tasks processed
71    tasks_processed: Arc<AtomicUsize>,
72}
73
74impl ExecutorInner {
75    /// Creates a new executor inner state
76    ///
77    /// This initializes the global queue, creates worker threads,
78    /// and starts the runtime.
79    ///
80    /// # Returns
81    ///
82    /// A new `ExecutorInner` instance
83    fn new() -> Self {
84        let global_queue = Arc::new(Injector::new());
85        let shutdown = Arc::new(AtomicBool::new(false));
86        let tasks_processed = Arc::new(AtomicUsize::new(0));
87        
88        let num_workers = std::thread::available_parallelism()
89            .map(|n| n.get())
90            .unwrap_or(4);
91        
92        let mut stealers = Vec::with_capacity(num_workers);
93        let mut worker_handles = Vec::with_capacity(num_workers);
94        
95        
96        // Create workers for each thread (workers are not shared)
97        for i in 0..num_workers {
98            let worker = crossbeam_deque::Worker::new_fifo();
99            stealers.push(worker.stealer());
100            
101            let worker_thread = WorkerThread {
102                id: i,
103                worker,
104                stealers: stealers.clone(),
105                global_queue: global_queue.clone(),
106                shutdown: shutdown.clone(),
107                tasks_processed: tasks_processed.clone(),
108            };
109            
110            let handle = thread::Builder::new()
111                .name(format!("luminal-worker-{}", i))
112                .spawn(move || worker_thread.run())
113                .expect("Failed to spawn worker thread");
114            
115            worker_handles.push(handle);
116        }
117        
118        // let all_stealers = Arc::new(stealers);
119        
120        ExecutorInner {
121            global_queue,
122            worker_handles,
123            shutdown,
124            next_task_id: AtomicU64::new(1),
125            tasks_processed,
126            // all_stealers,
127        }
128    }
129    
130    /// Spawns a new task to be executed by the runtime
131    ///
132    /// # Parameters
133    ///
134    /// * `future` - The future to execute as a task
135    ///
136    /// # Returns
137    ///
138    /// The ID of the spawned task
139    fn spawn_internal(&self, future: BoxFuture) -> TaskId {
140        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::Relaxed));
141        let task = Task::new(task_id, future);
142        
143        // Use more efficient task distribution:
144        // Try to distribute to local queues first, fall back to global
145        self.global_queue.push(task);
146        
147        task_id
148    }
149    
150    /// Returns statistics about the runtime
151    ///
152    /// # Returns
153    ///
154    /// A tuple containing the current queue length and the number of tasks processed
155    fn stats(&self) -> (usize, usize) {
156        let global_len = self.global_queue.len();
157        let tasks_processed = self.tasks_processed.load(Ordering::Relaxed);
158        (global_len, tasks_processed)
159    }
160}
161
162impl Drop for ExecutorInner {
163    /// Cleans up resources when the executor is dropped
164    ///
165    /// This signals worker threads to shut down and waits for them to finish.
166    fn drop(&mut self) {
167        self.shutdown.store(true, Ordering::Release);
168        
169        // Wait for workers to finish
170        for handle in self.worker_handles.drain(..) {
171            let _ = handle.join();
172        }
173    }
174}
175
176/// Core task execution engine for the Luminal runtime
177///
178/// The Executor is responsible for scheduling and executing tasks.
179/// It maintains a global task queue and a set of worker threads that
180/// process tasks using a work-stealing algorithm.
181pub struct Executor {
182    /// Shared executor state
183    inner: Arc<ExecutorInner>,
184}
185
186impl Executor {
187    /// Creates a new executor
188    ///
189    /// This initializes a new executor with worker threads based on the
190    /// number of available CPU cores.
191    ///
192    /// # Returns
193    ///
194    /// A new `Executor` instance
195    pub fn new() -> Self {
196        Executor {
197            inner: Arc::new(ExecutorInner::new()),
198        }
199    }
200
201    /// Spawns a future onto the executor
202    ///
203    /// This wraps the future in a task and schedules it for execution,
204    /// returning a JoinHandle that can be used to await its completion.
205    ///
206    /// # Type Parameters
207    ///
208    /// * `F` - The future type
209    ///
210    /// # Parameters
211    ///
212    /// * `future` - The future to execute
213    ///
214    /// # Returns
215    ///
216    /// A `JoinHandle` for the spawned task
217    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
218    where
219        F: Future + Send + 'static,
220        F::Output: Send + 'static,
221    {
222        // Use unbounded channel with correct type
223        let (sender, receiver) = unbounded::<F::Output>();
224        
225        // Optimized future wrapper with less overhead
226        let wrapped_future = async move {
227            let result = future.await;
228            let _ = sender.send(result);
229        };
230        
231        let task_id = self.inner.spawn_internal(Box::pin(wrapped_future));
232        
233        JoinHandle {
234            id: task_id,
235            receiver,
236        }
237    }
238
239    /// Blocks the current thread until the provided future completes
240    ///
241    /// This spawns the future as a task and then actively helps execute tasks
242    /// from the queue until the specified future completes.
243    ///
244    /// # Type Parameters
245    ///
246    /// * `F` - The future type
247    ///
248    /// # Parameters
249    ///
250    /// * `future` - The future to execute and wait for
251    ///
252    /// # Returns
253    ///
254    /// The output of the future
255    ///
256    /// # Panics
257    ///
258    /// Panics if the task times out (after 30 seconds) or the task channel is disconnected
259    pub fn block_on<F>(&self, future: F) -> F::Output
260    where
261        F: Future + Send + 'static,
262        F::Output: Send + 'static,
263    {
264        let handle = self.spawn(future);
265        
266        // Optimized blocking with better work helping
267        let mut backoff_count = 0u32;
268        
269        loop {
270            // Check for result first (most common case)
271            match handle.receiver.try_recv() {
272                Ok(result) => return result,
273                Err(crossbeam_channel::TryRecvError::Empty) => {
274                    // More efficient work helping strategy
275                    let mut helped = false;
276                    
277                    // Try to help with multiple tasks in one go
278                    for _ in 0..4 {
279                        match self.inner.global_queue.steal() {
280                            Steal::Success(mut task) => {
281                                let waker = create_task_waker(
282                                    task.id, 
283                                    self.inner.global_queue.clone()
284                                );
285                                let mut cx = Context::from_waker(&waker);
286
287                                if let std::task::Poll::Pending = task.poll(&mut cx) {
288                                    self.inner.global_queue.push(task);
289                                }
290                                helped = true;
291                            }
292                            Steal::Empty => break,
293                            Steal::Retry => continue,
294                        }
295                    }
296                    
297                    // Optimized backoff strategy
298                    if !helped {
299                        backoff_count += 1;
300                        if backoff_count > 1000 {
301                            thread::sleep(Duration::from_nanos(100));
302                            backoff_count = 0;
303                        } else if backoff_count > 100 {
304                            thread::yield_now();
305                        }
306                        // Else busy wait for better latency
307                    } else {
308                        backoff_count = 0;
309                    }
310                }
311                Err(crossbeam_channel::TryRecvError::Disconnected) => {
312                    panic!("Task execution failed - channel disconnected");
313                }
314            }
315        }
316    }
317
318    /// Runs the executor until the task queue is empty
319    ///
320    /// This method is primarily used for testing and benchmarking.
321    pub fn run(&self) {
322        // Runtime runs automatically via worker threads
323        while !self.inner.global_queue.is_empty() {
324            thread::sleep(Duration::from_millis(1));
325        }
326    }
327    
328    /// Returns statistics about the executor
329    ///
330    /// # Returns
331    ///
332    /// A tuple containing the current queue length and the number of tasks processed
333    pub fn stats(&self) -> (usize, usize) {
334        self.inner.stats()
335    }
336}