maniac_runtime/runtime/
mod.rs

1//! Executor module providing a cooperative async executor.
2//!
3//! This module implements a high-performance async executor that uses memory-mapped
4//! task arenas and worker threads for efficient task scheduling and execution.
5//!
6//! # Architecture
7//!
8//! The executor consists of:
9//! - `Executor`: Public API for spawning and managing async tasks
10//! - `ExecutorInner`: Internal shared state for task spawning
11//! - `WorkerService`: Manages worker threads that execute tasks
12//! - `TaskArena`: Memory-mapped arena for task storage
13//! - `JoinHandle`: Future that resolves when a spawned task completes
14
15pub mod deque;
16pub mod mpsc;
17pub mod preemption;
18pub mod signal;
19pub mod summary;
20pub mod task;
21pub mod ticker;
22#[cfg(test)]
23mod ticker_tests;
24pub mod timer;
25pub mod timer_wheel;
26pub mod waker;
27pub mod worker;
28#[cfg(test)]
29mod preemption_tests;
30
31use std::any::TypeId;
32use std::future::{Future, IntoFuture};
33use std::io;
34use std::panic::Location;
35use std::pin::Pin;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::sync::{Arc, Mutex};
38use std::task::{Context, Poll, Waker};
39use std::thread;
40use std::time::Duration;
41use task::{
42    FutureAllocator, SpawnError, TaskArena, TaskArenaConfig, TaskArenaOptions, TaskArenaStats,
43    TaskHandle,
44};
45use ticker::{TickService};
46use worker::{WorkerService, WorkerServiceConfig};
47
48use crate::num_cpus;
49
50pub fn new_single_threaded() -> io::Result<DefaultRuntime> {
51    let tick_service = TickService::new(Duration::from_millis(1));
52    tick_service.start();
53    Ok(DefaultRuntime {
54        executor: DefaultExecutor::new_with_tick_service(
55            TaskArenaConfig::new(8, 4096).unwrap(),
56            TaskArenaOptions::new(false, false),
57            1,
58            1,
59            Arc::clone(&tick_service),
60        )?,
61        blocking: DefaultBlockingExecutor::new_with_tick_service(
62            TaskArenaConfig::new(8, 4096).unwrap(),
63            TaskArenaOptions::new(false, false),
64            1,
65            num_cpus() * 8,
66            Arc::clone(&tick_service),
67        )?,
68        tick_service,
69    })
70}
71
72pub fn new_multi_threaded(
73    worker_count: usize,
74    mut max_tasks: usize,
75    blocking_min_workers: usize,
76    blocking_max_workers: usize,
77    mut max_blocking_tasks: usize,
78) -> io::Result<DefaultRuntime> {
79    let worker_count = worker_count.max(1);
80    if max_tasks == 0 {
81        max_tasks = worker_count * 4096;
82    }
83    max_tasks = max_tasks.next_power_of_two();
84    let config = ExecutorConfig::with_max_tasks(worker_count, worker_count, max_tasks)?;
85
86    let blocking_min_workers = blocking_min_workers.max(1);
87    let blocking_max_workers = blocking_max_workers.max(blocking_min_workers);
88    if max_blocking_tasks == 0 {
89        max_blocking_tasks = blocking_max_workers * 4096;
90    }
91    let blocking_config = ExecutorConfig::with_max_tasks(
92        blocking_min_workers,
93        blocking_max_workers,
94        max_blocking_tasks,
95    )?;
96    let tick_service = TickService::new(Duration::from_millis(1));
97    tick_service.start();
98
99    Ok(DefaultRuntime {
100        executor: DefaultExecutor::new_with_tick_service(
101            config.task_arena,
102            config.task_arena_options,
103            config.min_workers,
104            config.max_workers,
105            Arc::clone(&tick_service),
106        )?,
107        blocking: DefaultBlockingExecutor::new_with_tick_service(
108            blocking_config.task_arena,
109            blocking_config.task_arena_options,
110            blocking_config.min_workers,
111            blocking_config.max_workers,
112            Arc::clone(&tick_service),
113        )?,
114        tick_service,
115    })
116}
117
118pub struct Runtime<
119    const P: usize = 10,
120    const NUM_SEGS_P2: usize = 8,
121    const BLOCKING_P: usize = 8,
122    const BLOCKING_NUM_SEGS_P2: usize = 5,
123> {
124    executor: Executor<P, NUM_SEGS_P2>,
125    blocking: Executor<BLOCKING_P, BLOCKING_NUM_SEGS_P2>,
126    tick_service: Arc<TickService>,
127}
128
129pub type DefaultRuntime = Runtime<10, 8, 8, 5>;
130
131impl<
132    const P: usize,
133    const NUM_SEGS_P2: usize,
134    const BLOCKING_P: usize,
135    const BLOCKING_NUM_SEGS_P2: usize,
136> Runtime<P, NUM_SEGS_P2, BLOCKING_P, BLOCKING_NUM_SEGS_P2>
137{
138    /// Spawns an asynchronous task on the executor, returning an awaitable join handle.
139    ///
140    /// This is the primary method for scheduling async work on the executor.
141    /// The spawned task will be executed by one of the executor's worker threads.
142    ///
143    /// # Arguments
144    ///
145    /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
146    ///
147    /// # Returns
148    ///
149    /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
150    /// * `Err(SpawnError)`: Error if spawning fails
151    ///
152    /// # Example
153    ///
154    /// ```no_run
155    /// use maniac::runtime::Executor;
156    /// let rt = manic::runtime::new_multi_threaded();
157    /// let handle = rt.spawn(async {
158    ///     // Your async code here
159    ///     42
160    /// })?;
161    ///
162    /// // Await the result
163    /// // let result = handle.await;
164    /// # Ok::<(), maniac::runtime::task::SpawnError>(())
165    /// ```
166    #[track_caller]
167    pub fn spawn<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
168    where
169        F: IntoFuture<Output = T> + Send + 'static,
170        F::IntoFuture: Future<Output = T> + Send + 'static,
171        T: Send + 'static,
172    {
173        let location = Location::caller();
174        let type_id = TypeId::of::<F>();
175        self.executor.inner.spawn(type_id, location, future)
176    }
177
178    /// Spawns a blocking task on the blocking executor, returning an awaitable join handle.
179    ///
180    /// This is the primary method for scheduling async work on the executor.
181    /// The spawned task will be executed by one of the executor's worker threads.
182    ///
183    /// # Arguments
184    ///
185    /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
186    ///
187    /// # Returns
188    ///
189    /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
190    /// * `Err(SpawnError)`: Error if spawning fails
191    ///
192    /// # Example
193    ///
194    /// ```no_run
195    /// # use maniac::runtime::Executor;
196    /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
197    /// # let executor = Executor::new(
198    /// #     TaskArenaConfig::new(1, 8).unwrap(),
199    /// #     TaskArenaOptions::default(),
200    /// #     1,
201    /// # ).unwrap();
202    /// let handle = executor.spawn(async {
203    ///     // Your async code here
204    ///     42
205    /// })?;
206    ///
207    /// // Await the result
208    /// // let result = handle.await;
209    /// # Ok::<(), maniac::runtime::task::SpawnError>(())
210    /// ```
211    #[track_caller]
212    pub fn spawn_blocking<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
213    where
214        F: IntoFuture<Output = T> + Send + 'static,
215        F::IntoFuture: Future<Output = T> + Send + 'static,
216        T: Send + 'static,
217    {
218        let location = Location::caller();
219        let type_id = TypeId::of::<F>();
220        self.blocking.inner.spawn(type_id, location, future)
221    }
222}
223
224/// Cooperative executor executor backed by `MmapExecutorArena` and worker threads.
225///
226/// The executor provides a high-performance async executor that uses:
227/// - Memory-mapped task arenas for efficient task storage
228/// - Worker threads for parallel task execution
229/// - Cooperative scheduling for optimal throughput
230///
231/// # Type Parameters
232///
233/// - `P`: Priority levels (default: 10)
234/// - `NUM_SEGS_P2`: Number of segments as a power of 2 (default: 6, meaning 2^6 = 64 segments)
235///
236/// # Example
237///
238/// ```no_run
239/// use maniac::runtime::Executor;
240/// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
241///
242/// let executor = Executor::new(
243///     TaskArenaConfig::new(1, 8).unwrap(),
244///     TaskArenaOptions::default(),
245///     4, // worker_count
246/// ).unwrap();
247///
248/// let handle = executor.spawn(async {
249///     // Your async code here
250///     42
251/// }).unwrap();
252///
253/// // Later, await the result
254/// // let result = handle.await;
255/// ```
256#[derive(Clone)]
257pub struct Executor<const P: usize = 10, const NUM_SEGS_P2: usize = 8> {
258    /// Internal executor state shared across all operations
259    inner: Arc<ExecutorInner<P, NUM_SEGS_P2>>,
260    /// Optionally owned tick service - if Some, this executor is responsible for shutting it down
261    owned_tick_service: Option<Arc<TickService>>,
262}
263
264pub type DefaultExecutor = Executor<10, 8>;
265pub type DefaultBlockingExecutor = Executor<8, 5>;
266
267impl<const P: usize, const NUM_SEGS_P2: usize> Executor<P, NUM_SEGS_P2> {
268    pub fn new_single_threaded() -> Self {
269        Self::new(
270            TaskArenaConfig::new(8, 4096).unwrap(),
271            TaskArenaOptions::new(false, false),
272            1,
273            1,
274        )
275        .unwrap()
276    }
277}
278
279/// Internal executor state shared between all executor operations.
280///
281/// This structure holds the core components needed for task spawning and execution:
282/// - `shutdown`: Atomic flag indicating whether the executor is shutting down
283/// - `tick_service`: Shared tick service for time-based operations
284/// - `service`: Worker service that manages task scheduling and execution
285struct ExecutorInner<const P: usize, const NUM_SEGS_P2: usize> {
286    /// Atomic flag indicating if the executor is shutting down.
287    /// Used to prevent new tasks from being spawned during shutdown.
288    shutdown: Arc<AtomicBool>,
289    /// Shared tick service for time-based operations across worker services.
290    tick_service: Arc<TickService>,
291    /// Worker service that manages task scheduling, worker threads, and task execution.
292    service: Arc<WorkerService<P, NUM_SEGS_P2>>,
293}
294
295impl<const P: usize, const NUM_SEGS_P2: usize> ExecutorInner<P, NUM_SEGS_P2> {
296    /// Spawns a new async task on the executor.
297    ///
298    /// This method performs the following steps:
299    /// 1. Validates that the executor is not shutting down and the arena is open
300    /// 2. Reserves a task slot from the worker service
301    /// 3. Initializes the task in the arena with its global ID
302    /// 4. Wraps the user's future in a join future that handles completion and cleanup
303    /// 5. Attaches the future to the task and schedules it for execution
304    ///
305    /// # Arguments
306    ///
307    /// * `future`: The future to execute. Must implement `IntoFuture` and be `Send + 'static`.
308    ///
309    /// # Returns
310    ///
311    /// * `Ok(JoinHandle<T>)`: A join handle that can be awaited to get the task's result
312    /// * `Err(SpawnError)`: Error if spawning fails (closed executor, no capacity, or attach failed)
313    ///
314    /// # Errors
315    ///
316    /// - `SpawnError::Closed`: Executor is shutting down or arena is closed
317    /// - `SpawnError::NoCapacity`: No available task slots in the arena
318    /// - `SpawnError::AttachFailed`: Failed to attach the future to the task
319    fn spawn<F, T>(
320        &self,
321        type_id: TypeId,
322        location: &'static Location,
323        future: F,
324    ) -> Result<JoinHandle<T>, SpawnError>
325    where
326        F: IntoFuture<Output = T> + Send + 'static,
327        F::IntoFuture: Future<Output = T> + Send + 'static,
328        T: Send + 'static,
329    {
330        let arena = self.service.arena();
331
332        // Early return if executor is shutting down or arena is closed
333        // This prevents spawning new tasks during shutdown
334        if self.shutdown.load(Ordering::Acquire) || arena.is_closed() {
335            return Err(SpawnError::Closed);
336        }
337
338        // Reserve a task slot from the worker service
339        // Returns None if no capacity is available
340        let Some(handle) = self.service.reserve_task() else {
341            return Err(if arena.is_closed() {
342                SpawnError::Closed
343            } else {
344                SpawnError::NoCapacity
345            });
346        };
347
348        // Calculate the global task ID and initialize the task in the arena
349        // The summary pointer is used for task tracking and statistics
350        let global_id = handle.global_id(arena.tasks_per_leaf());
351        arena.init_task(global_id, self.service.summary() as *const _);
352
353        // Get the task reference and prepare handles for cleanup
354        let task = handle.task();
355        let release_handle = TaskHandle::from_non_null(handle.as_non_null());
356        let service = Arc::clone(&self.service);
357
358        // Create shared state for the join handle
359        // This allows the spawned task to signal completion and the join handle to await it
360        let shared = Arc::new(JoinShared::new());
361        let shared_for_future = Arc::clone(&shared);
362        let release_handle_for_future = release_handle;
363        let service_for_future = Arc::clone(&self.service);
364
365        // Wrap the user's future in a join future that:
366        // 1. Awaits the user's future
367        // 2. Signals completion via JoinShared
368        // 3. Releases the task handle back to the worker service
369        let join_future = async move {
370            let result = future.into_future().await;
371            shared_for_future.complete(result);
372            service_for_future.release_task(release_handle_for_future);
373        };
374
375        // Allocate the future on the heap using the custom allocator
376        let future_ptr = FutureAllocator::box_future(join_future);
377
378        // Attach the future to the task
379        // If attachment fails, clean up the allocated future and release the task handle
380        if task.attach_future(future_ptr).is_err() {
381            unsafe { FutureAllocator::drop_boxed(future_ptr) };
382            self.service.release_task(release_handle);
383            return Err(SpawnError::AttachFailed);
384        }
385
386        // Schedule the task for execution by worker threads
387        task.schedule();
388
389        // Return a join handle that the caller can await
390        Ok(JoinHandle::new(shared))
391    }
392
393    /// Initiates shutdown of the executor.
394    ///
395    /// This method:
396    /// 1. Sets the shutdown flag atomically (idempotent - safe to call multiple times)
397    /// 2. Closes the task arena to prevent new tasks
398    /// 3. Signals the worker service to shut down
399    ///
400    /// After shutdown is initiated, no new tasks can be spawned, but existing tasks
401    /// will continue to run until they complete.
402    fn shutdown(&self) {
403        // Use swap to atomically set shutdown flag and check if it was already set
404        // Return early if shutdown was already initiated
405        if self.shutdown.swap(true, Ordering::Release) {
406            return;
407        }
408
409        // Close the arena to prevent new task initialization
410        self.service.arena().close();
411
412        // Signal worker service to begin shutdown process
413        self.service.shutdown();
414    }
415}
416
417pub struct ExecutorConfig {
418    task_arena: TaskArenaConfig,
419    task_arena_options: TaskArenaOptions,
420    min_workers: usize,
421    max_workers: usize,
422}
423
424impl ExecutorConfig {
425    pub fn new(
426        task_arena: TaskArenaConfig,
427        task_arena_options: TaskArenaOptions,
428        min_workers: usize,
429        max_workers: usize,
430    ) -> ExecutorConfig {
431        Self {
432            task_arena,
433            task_arena_options,
434            min_workers,
435            max_workers,
436        }
437    }
438
439    pub fn with_max_tasks(
440        min_workers: usize,
441        max_workers: usize,
442        max_tasks: usize,
443    ) -> io::Result<Self> {
444        let min_workers = min_workers.max(1);
445        let max_workers = max_workers.max(min_workers);
446        let max_tasks = max_tasks.max(max_workers);
447        let mut tasks_per_leaf = 4096;
448        let mut leaf_count = (max_tasks / tasks_per_leaf).max(1);
449
450        if leaf_count < max_workers {
451            leaf_count = max_workers;
452            tasks_per_leaf = (max_tasks / leaf_count).max(1);
453        }
454
455        Ok(Self {
456            task_arena: TaskArenaConfig::new(leaf_count, tasks_per_leaf)?,
457            task_arena_options: TaskArenaOptions::new(false, false),
458            min_workers,
459            max_workers,
460        })
461    }
462}
463
464impl<const P: usize, const NUM_SEGS_P2: usize> Executor<P, NUM_SEGS_P2> {
465    /// Creates a new executor instance with the specified configuration.
466    ///
467    /// This method initializes:
468    /// 1. A task arena with the given configuration and options
469    /// 2. A worker service with the specified number of worker threads
470    /// 3. Internal executor state for task management
471    ///
472    /// # Arguments
473    ///
474    /// * `config`: Configuration for the task arena (capacity, layout, etc.)
475    /// * `options`: Options for arena initialization (memory mapping, etc.)
476    /// * `worker_count`: Number of worker threads to spawn for task execution
477    ///
478    /// # Returns
479    ///
480    /// * `Ok(Executor)`: Successfully created executor instance
481    /// * `Err(io::Error)`: Error if arena initialization fails (e.g., memory mapping issues)
482    ///
483    /// # Example
484    ///
485    /// ```no_run
486    /// use maniac::runtime::Executor;
487    /// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
488    ///
489    /// let executor = Executor::new(
490    ///     TaskArenaConfig::new(1, 8).unwrap(),
491    ///     TaskArenaOptions::default(),
492    ///     4, // Use 4 worker threads
493    /// )?;
494    /// # Ok::<(), std::io::Error>(())
495    /// ```
496    pub fn new(
497        config: TaskArenaConfig,
498        options: TaskArenaOptions,
499        worker_count: usize,
500        max_worker_count: usize,
501    ) -> io::Result<Self> {
502        let worker_config = WorkerServiceConfig::default();
503        // Create shared tick service for time-based operations
504        // This allows multiple worker services to share a single tick thread
505        let tick_service = TickService::new(worker_config.tick_duration);
506        tick_service.start();
507
508        let mut executor = Self::new_with_tick_service(config, options, worker_count, max_worker_count, Arc::clone(&tick_service))?;
509        // Mark that this executor owns the tick service and is responsible for shutting it down
510        executor.owned_tick_service = Some(tick_service);
511        Ok(executor)
512    }
513
514        /// Creates a new executor instance with the specified configuration.
515    ///
516    /// This method initializes:
517    /// 1. A task arena with the given configuration and options
518    /// 2. A worker service with the specified number of worker threads
519    /// 3. Internal executor state for task management
520    ///
521    /// # Arguments
522    ///
523    /// * `config`: Configuration for the task arena (capacity, layout, etc.)
524    /// * `options`: Options for arena initialization (memory mapping, etc.)
525    /// * `worker_count`: Number of worker threads to spawn for task execution
526    ///
527    /// # Returns
528    ///
529    /// * `Ok(Executor)`: Successfully created executor instance
530    /// * `Err(io::Error)`: Error if arena initialization fails (e.g., memory mapping issues)
531    ///
532    /// # Example
533    ///
534    /// ```no_run
535    /// use maniac::runtime::Executor;
536    /// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
537    ///
538    /// let executor = Executor::new(
539    ///     TaskArenaConfig::new(1, 8).unwrap(),
540    ///     TaskArenaOptions::default(),
541    ///     4, // Use 4 worker threads
542    /// )?;
543    /// # Ok::<(), std::io::Error>(())
544    /// ```
545    pub fn new_with_tick_service(
546        config: TaskArenaConfig,
547        options: TaskArenaOptions,
548        worker_count: usize,
549        max_worker_count: usize,
550        tick_service: Arc<TickService>,
551    ) -> io::Result<Self> {
552        let worker_count = worker_count.max(1);
553        let max_worker_count = max_worker_count.max(worker_count);
554
555        // Initialize the memory-mapped task arena with the provided configuration
556        let arena = TaskArena::with_config(config, options)?;
557
558        // Create worker service configuration
559        // Set min_workers to the desired count to ensure workers start immediately
560        // Set max_workers to at least the desired count (or CPU count, whichever is higher)
561        // This ensures workers start immediately on WorkerService::start()
562        let worker_config = WorkerServiceConfig {
563            min_workers: worker_count,
564            max_workers: max_worker_count,
565            ..WorkerServiceConfig::default()
566        };
567
568        // Start the worker service with the arena and configuration
569        // This spawns worker threads that will execute tasks
570        let service = WorkerService::<P, NUM_SEGS_P2>::start(arena, worker_config, &tick_service);
571
572        // Initialize shutdown flag to false (executor is active)
573        let shutdown = Arc::new(AtomicBool::new(false));
574
575        // Create internal executor state shared across all operations
576        let inner = Arc::new(ExecutorInner::<P, NUM_SEGS_P2> {
577            shutdown: Arc::clone(&shutdown),
578            tick_service,
579            service: Arc::clone(&service),
580        });
581
582        Ok(Self {
583            inner,
584            owned_tick_service: None, // Not owned when using an external tick service
585        })
586    }
587
588    /// Spawns an asynchronous task on the executor, returning an awaitable join handle.
589    ///
590    /// This is the primary method for scheduling async work on the executor.
591    /// The spawned task will be executed by one of the executor's worker threads.
592    ///
593    /// # Arguments
594    ///
595    /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
596    ///
597    /// # Returns
598    ///
599    /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
600    /// * `Err(SpawnError)`: Error if spawning fails
601    ///
602    /// # Example
603    ///
604    /// ```no_run
605    /// # use maniac::runtime::Executor;
606    /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
607    /// # let executor = Executor::new(
608    /// #     TaskArenaConfig::new(1, 8).unwrap(),
609    /// #     TaskArenaOptions::default(),
610    /// #     1,
611    /// # ).unwrap();
612    /// let handle = executor.spawn(async {
613    ///     // Your async code here
614    ///     42
615    /// })?;
616    ///
617    /// // Await the result
618    /// // let result = handle.await;
619    /// # Ok::<(), maniac::runtime::task::SpawnError>(())
620    /// ```
621    #[track_caller]
622    pub fn spawn<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
623    where
624        F: IntoFuture<Output = T> + Send + 'static,
625        F::IntoFuture: Future<Output = T> + Send + 'static,
626        T: Send + 'static,
627    {
628        let location = Location::caller();
629        let type_id = TypeId::of::<F>();
630        self.inner.spawn(type_id, location, future)
631    }
632
633    /// Returns current arena statistics.
634    ///
635    /// This provides insight into the executor's current state, including:
636    /// - Number of active tasks
637    /// - Task capacity and utilization
638    /// - Other arena-specific metrics
639    ///
640    /// # Returns
641    ///
642    /// `TaskArenaStats` containing current executor statistics
643    pub fn stats(&self) -> TaskArenaStats {
644        self.inner.service.arena().stats()
645    }
646    
647    /// Returns a reference to the underlying worker service.
648    ///
649    /// This allows direct access to worker service operations such as:
650    /// - Interrupting workers for preemptive scheduling
651    /// - Accessing worker statistics and health information
652    /// - Managing worker threads dynamically
653    ///
654    /// # Returns
655    ///
656    /// An `Arc` to the `WorkerService` managing this executor's workers
657    pub fn service(&self) -> &Arc<WorkerService<P, NUM_SEGS_P2>> {
658        &self.inner.service
659    }
660}
661
662impl<const P: usize, const NUM_SEGS_P2: usize> Drop for Executor<P, NUM_SEGS_P2> {
663    /// Cleans up the executor when it goes out of scope.
664    ///
665    /// This implementation:
666    /// 1. Initiates shutdown of the executor (closes arena, signals workers)
667    /// 2. If this executor owns a tick service, shuts it down as well
668    /// 3. Unparks any worker threads that might be waiting
669    /// 4. Joins all worker threads to ensure clean shutdown
670    ///
671    /// Note: Currently, worker threads are managed by `WorkerService`, so the
672    /// `workers` vector is typically empty. This code is prepared for future
673    /// scenarios where we might manage worker threads directly.
674    fn drop(&mut self) {
675        // Initiate shutdown process (idempotent)
676        self.inner.shutdown();
677
678        // If this executor owns the tick service, shut it down
679        // This must happen after worker shutdown to avoid issues with the tick handler
680        if let Some(tick_service) = &self.owned_tick_service {
681            tick_service.shutdown();
682        }
683
684        // // Unpark any waiting worker threads to allow them to exit
685        // for worker in &self.workers {
686        //     worker.thread().unpark();
687        // }
688        //
689        // // Wait for all worker threads to complete
690        // // drain(..) consumes the vector and returns an iterator
691        // for handle in self.workers.drain(..) {
692        //     let _ = handle.join();
693        // }
694    }
695}
696
697/// Shared state between a spawned task and its join handle.
698///
699/// This structure enables communication between:
700/// - The spawned task (which calls `complete()` when done)
701/// - The join handle (which calls `poll()` to await completion)
702///
703/// It uses a lock-free fast path (atomic `ready` flag) with a fallback to
704/// mutex-protected state for storing the result and waker.
705struct JoinShared<T> {
706    /// Atomic flag indicating if the task has completed.
707    /// Used as a fast-path check before acquiring the mutex.
708    ready: AtomicBool,
709    /// Mutex-protected state containing the result and waker.
710    /// The mutex is only held briefly during result storage/retrieval.
711    state: Mutex<JoinSharedState<T>>,
712}
713
714/// Internal state protected by the mutex in `JoinShared`.
715struct JoinSharedState<T> {
716    /// The task's result, if it has completed.
717    result: Option<T>,
718    /// The waker to notify when the task completes.
719    /// Stored here so we can wake the awaiting future when completion occurs.
720    waker: Option<Waker>,
721}
722
723impl<T> JoinShared<T> {
724    /// Creates a new `JoinShared` instance in the initial (pending) state.
725    fn new() -> Self {
726        Self {
727            ready: AtomicBool::new(false),
728            state: Mutex::new(JoinSharedState {
729                result: None,
730                waker: None,
731            }),
732        }
733    }
734
735    /// Marks the task as complete and stores the result.
736    ///
737    /// This is called by the spawned task when it finishes execution.
738    /// If a waker is registered, it will be notified to wake the awaiting future.
739    ///
740    /// # Arguments
741    ///
742    /// * `value`: The result value from the completed task
743    ///
744    /// # Safety
745    ///
746    /// This method is idempotent - calling it multiple times is safe but only
747    /// the first call will have an effect.
748    fn complete(&self, value: T) {
749        let mut state = self.state.lock().unwrap();
750
751        // Idempotency check: if result already exists, don't overwrite it
752        if state.result.is_some() {
753            return;
754        }
755
756        // Store the result
757        state.result = Some(value);
758
759        // Set the ready flag atomically (Release ordering ensures previous writes are visible)
760        self.ready.store(true, Ordering::Release);
761
762        // Take the waker (if any) and drop the lock before waking
763        // This prevents holding the lock while executing waker code
764        let waker = state.waker.take();
765        drop(state);
766
767        // Wake the awaiting future if a waker was registered
768        if let Some(waker) = waker {
769            waker.wake();
770        }
771    }
772
773    /// Polls for the task's completion.
774    ///
775    /// This is called by the `JoinHandle` future's `poll` method.
776    /// Returns `Poll::Ready(T)` if the task has completed, or `Poll::Pending` if not.
777    ///
778    /// # Arguments
779    ///
780    /// * `cx`: The async context, used to register a waker for notification
781    ///
782    /// # Returns
783    ///
784    /// * `Poll::Ready(T)`: Task has completed, returns the result
785    /// * `Poll::Pending`: Task is still running, waker has been registered
786    fn poll(&self, cx: &mut Context<'_>) -> Poll<T> {
787        // Fast path: check atomic flag first (avoid mutex if already ready)
788        if self.ready.load(Ordering::Acquire) {
789            let mut state = self.state.lock().unwrap();
790            if let Some(value) = state.result.take() {
791                return Poll::Ready(value);
792            }
793        }
794
795        // Slow path: acquire lock and check again (handles race condition)
796        // Also register waker if task is still pending
797        let mut state = self.state.lock().unwrap();
798
799        // Double-check: result might have been set between the atomic check and lock acquisition
800        if let Some(value) = state.result.take() {
801            return Poll::Ready(value);
802        }
803
804        // Task is still pending: register the waker so we can be notified when it completes
805        state.waker = Some(cx.waker().clone());
806        Poll::Pending
807    }
808
809    /// Returns `true` if the task has completed.
810    ///
811    /// This is a non-blocking check that uses the atomic flag.
812    /// Note: The result may have already been taken by a previous poll.
813    fn is_finished(&self) -> bool {
814        self.ready.load(Ordering::Acquire)
815    }
816}
817
818/// Awaitable join handle returned from [`Executor::spawn`].
819///
820/// This handle implements `Future<Output = T>` and can be awaited to get the
821/// result of the spawned task. The handle can be cloned and shared, allowing
822/// multiple futures to await the same task.
823///
824/// # Example
825///
826/// ```no_run
827/// # use maniac::runtime::Executor;
828/// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
829/// # let executor = Executor::new(
830/// #     TaskArenaConfig::new(1, 8).unwrap(),
831/// #     TaskArenaOptions::default(),
832/// #     1,
833/// # ).unwrap();
834/// let handle = executor.spawn(async { 42 })?;
835///
836/// // Await the result
837/// let result = handle.await;
838/// # Ok::<(), maniac::runtime::task::SpawnError>(())
839/// ```
840pub struct JoinHandle<T> {
841    /// Shared state with the spawned task for synchronization
842    shared: Arc<JoinShared<T>>,
843}
844
845impl<T> JoinHandle<T> {
846    /// Creates a new join handle from shared state.
847    ///
848    /// This is an internal constructor used by `ExecutorInner::spawn`.
849    fn new(shared: Arc<JoinShared<T>>) -> Self {
850        Self { shared }
851    }
852
853    /// Returns `true` if the task has completed.
854    ///
855    /// This is a non-blocking check. Note that even if `is_finished()` returns
856    /// `true`, the result may have already been consumed by a previous await.
857    ///
858    /// # Example
859    ///
860    /// ```no_run
861    /// # use maniac::runtime::Executor;
862    /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
863    /// # let executor = Executor::new(
864    /// #     TaskArenaConfig::new(1, 8).unwrap(),
865    /// #     TaskArenaOptions::default(),
866    /// #     1,
867    /// # ).unwrap();
868    /// let handle = executor.spawn(async { 42 })?;
869    ///
870    /// // Check if task is done (non-blocking)
871    /// if handle.is_finished() {
872    ///     println!("Task completed!");
873    /// }
874    /// # Ok::<(), maniac::runtime::task::SpawnError>(())
875    /// ```
876    pub fn is_finished(&self) -> bool {
877        self.shared.is_finished()
878    }
879}
880
881impl<T: Send + 'static> Future for JoinHandle<T> {
882    type Output = T;
883
884    /// Polls the join handle for completion.
885    ///
886    /// This method:
887    /// - Returns `Poll::Ready(T)` if the task has completed
888    /// - Returns `Poll::Pending` if the task is still running
889    /// - Registers a waker to be notified when the task completes
890    ///
891    /// # Arguments
892    ///
893    /// * `cx`: The async context containing the waker
894    ///
895    /// # Returns
896    ///
897    /// * `Poll::Ready(T)`: Task completed, returns the result
898    /// * `Poll::Pending`: Task still running, will wake when done
899    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
900        self.shared.poll(cx)
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use crate::future::block_on;
908    use std::future::Future;
909    use std::sync::atomic::{AtomicUsize, Ordering};
910    use std::task::{Context, RawWaker, RawWakerVTable};
911    use std::time::Duration;
912
913    fn create_executor() -> DefaultExecutor {
914        DefaultExecutor::new_single_threaded()
915    }
916
917    /// Creates a no-op waker for testing purposes.
918    ///
919    /// This waker does nothing when woken, which is useful for testing
920    /// scenarios where we don't need actual async executor behavior.
921    fn noop_waker() -> Waker {
922        unsafe fn clone(_: *const ()) -> RawWaker {
923            RawWaker::new(std::ptr::null(), &VTABLE)
924        }
925        unsafe fn wake(_: *const ()) {}
926        static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, wake);
927        unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
928    }
929
930    /// Tests that a spawned task completes successfully and returns the correct result.
931    ///
932    /// This test:
933    /// 1. Creates a executor with a single worker thread
934    /// 2. Spawns a task that increments an atomic counter
935    /// 3. Awaits the task completion
936    /// 4. Verifies the result and that the counter was incremented
937    /// 5. Verifies that the task was properly cleaned up (active_tasks == 0)
938    #[test]
939    fn executor_spawn_completes() {
940        // Create a executor with minimal configuration for testing
941        let executor = create_executor();
942
943        // Create a shared counter to verify task execution
944        let counter = Arc::new(AtomicUsize::new(0));
945        let cloned = counter.clone();
946
947        // Spawn a task that increments the counter and returns the new value
948        let join = executor
949            .spawn(async move { cloned.fetch_add(1, Ordering::Relaxed) + 1 })
950            .expect("spawn");
951
952        // Block until the task completes and get the result
953        let result = block_on(join);
954
955        // Verify the result is correct
956        assert_eq!(result, 1);
957        // Verify the counter was actually incremented
958        assert_eq!(counter.load(Ordering::Relaxed), 1);
959        // Verify the task was cleaned up after completion
960        assert_eq!(executor.stats().active_tasks, 0);
961    }
962
963    /// Tests that `JoinHandle::is_finished()` correctly reports task completion status.
964    ///
965    /// This test:
966    /// 1. Creates a executor and spawns a simple task
967    /// 2. Verifies `is_finished()` returns `false` before completion
968    /// 3. Awaits the task to completion
969    /// 4. Verifies that the task was properly cleaned up
970    #[test]
971    fn join_handle_reports_completion() {
972        // Create a executor with minimal configuration
973        let executor = create_executor();
974
975        // Spawn an empty task (just completes immediately)
976        let join = executor.spawn(async {}).expect("spawn");
977
978        // Verify task is not finished immediately after spawning
979        // (though it may complete very quickly, this check happens before awaiting)
980        assert!(!join.is_finished());
981
982        // Await the task to completion
983        block_on(join);
984
985        // Verify the task was cleaned up after completion
986        assert_eq!(executor.stats().active_tasks, 0);
987    }
988}