maniac_runtime/runtime/mod.rs
1//! Executor module providing a cooperative async executor.
2//!
3//! This module implements a high-performance async executor that uses memory-mapped
4//! task arenas and worker threads for efficient task scheduling and execution.
5//!
6//! # Architecture
7//!
8//! The executor consists of:
9//! - `Executor`: Public API for spawning and managing async tasks
10//! - `ExecutorInner`: Internal shared state for task spawning
11//! - `WorkerService`: Manages worker threads that execute tasks
12//! - `TaskArena`: Memory-mapped arena for task storage
13//! - `JoinHandle`: Future that resolves when a spawned task completes
14
15pub mod deque;
16pub mod mpsc;
17pub mod preemption;
18pub mod signal;
19pub mod summary;
20pub mod task;
21pub mod ticker;
22#[cfg(test)]
23mod ticker_tests;
24pub mod timer;
25pub mod timer_wheel;
26pub mod waker;
27pub mod worker;
28#[cfg(test)]
29mod preemption_tests;
30
31use std::any::TypeId;
32use std::future::{Future, IntoFuture};
33use std::io;
34use std::panic::Location;
35use std::pin::Pin;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::sync::{Arc, Mutex};
38use std::task::{Context, Poll, Waker};
39use std::thread;
40use std::time::Duration;
41use task::{
42 FutureAllocator, SpawnError, TaskArena, TaskArenaConfig, TaskArenaOptions, TaskArenaStats,
43 TaskHandle,
44};
45use ticker::{TickService};
46use worker::{WorkerService, WorkerServiceConfig};
47
48use crate::num_cpus;
49
50pub fn new_single_threaded() -> io::Result<DefaultRuntime> {
51 let tick_service = TickService::new(Duration::from_millis(1));
52 tick_service.start();
53 Ok(DefaultRuntime {
54 executor: DefaultExecutor::new_with_tick_service(
55 TaskArenaConfig::new(8, 4096).unwrap(),
56 TaskArenaOptions::new(false, false),
57 1,
58 1,
59 Arc::clone(&tick_service),
60 )?,
61 blocking: DefaultBlockingExecutor::new_with_tick_service(
62 TaskArenaConfig::new(8, 4096).unwrap(),
63 TaskArenaOptions::new(false, false),
64 1,
65 num_cpus() * 8,
66 Arc::clone(&tick_service),
67 )?,
68 tick_service,
69 })
70}
71
72pub fn new_multi_threaded(
73 worker_count: usize,
74 mut max_tasks: usize,
75 blocking_min_workers: usize,
76 blocking_max_workers: usize,
77 mut max_blocking_tasks: usize,
78) -> io::Result<DefaultRuntime> {
79 let worker_count = worker_count.max(1);
80 if max_tasks == 0 {
81 max_tasks = worker_count * 4096;
82 }
83 max_tasks = max_tasks.next_power_of_two();
84 let config = ExecutorConfig::with_max_tasks(worker_count, worker_count, max_tasks)?;
85
86 let blocking_min_workers = blocking_min_workers.max(1);
87 let blocking_max_workers = blocking_max_workers.max(blocking_min_workers);
88 if max_blocking_tasks == 0 {
89 max_blocking_tasks = blocking_max_workers * 4096;
90 }
91 let blocking_config = ExecutorConfig::with_max_tasks(
92 blocking_min_workers,
93 blocking_max_workers,
94 max_blocking_tasks,
95 )?;
96 let tick_service = TickService::new(Duration::from_millis(1));
97 tick_service.start();
98
99 Ok(DefaultRuntime {
100 executor: DefaultExecutor::new_with_tick_service(
101 config.task_arena,
102 config.task_arena_options,
103 config.min_workers,
104 config.max_workers,
105 Arc::clone(&tick_service),
106 )?,
107 blocking: DefaultBlockingExecutor::new_with_tick_service(
108 blocking_config.task_arena,
109 blocking_config.task_arena_options,
110 blocking_config.min_workers,
111 blocking_config.max_workers,
112 Arc::clone(&tick_service),
113 )?,
114 tick_service,
115 })
116}
117
118pub struct Runtime<
119 const P: usize = 10,
120 const NUM_SEGS_P2: usize = 8,
121 const BLOCKING_P: usize = 8,
122 const BLOCKING_NUM_SEGS_P2: usize = 5,
123> {
124 executor: Executor<P, NUM_SEGS_P2>,
125 blocking: Executor<BLOCKING_P, BLOCKING_NUM_SEGS_P2>,
126 tick_service: Arc<TickService>,
127}
128
129pub type DefaultRuntime = Runtime<10, 8, 8, 5>;
130
131impl<
132 const P: usize,
133 const NUM_SEGS_P2: usize,
134 const BLOCKING_P: usize,
135 const BLOCKING_NUM_SEGS_P2: usize,
136> Runtime<P, NUM_SEGS_P2, BLOCKING_P, BLOCKING_NUM_SEGS_P2>
137{
138 /// Spawns an asynchronous task on the executor, returning an awaitable join handle.
139 ///
140 /// This is the primary method for scheduling async work on the executor.
141 /// The spawned task will be executed by one of the executor's worker threads.
142 ///
143 /// # Arguments
144 ///
145 /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
146 ///
147 /// # Returns
148 ///
149 /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
150 /// * `Err(SpawnError)`: Error if spawning fails
151 ///
152 /// # Example
153 ///
154 /// ```no_run
155 /// use maniac::runtime::Executor;
156 /// let rt = manic::runtime::new_multi_threaded();
157 /// let handle = rt.spawn(async {
158 /// // Your async code here
159 /// 42
160 /// })?;
161 ///
162 /// // Await the result
163 /// // let result = handle.await;
164 /// # Ok::<(), maniac::runtime::task::SpawnError>(())
165 /// ```
166 #[track_caller]
167 pub fn spawn<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
168 where
169 F: IntoFuture<Output = T> + Send + 'static,
170 F::IntoFuture: Future<Output = T> + Send + 'static,
171 T: Send + 'static,
172 {
173 let location = Location::caller();
174 let type_id = TypeId::of::<F>();
175 self.executor.inner.spawn(type_id, location, future)
176 }
177
178 /// Spawns a blocking task on the blocking executor, returning an awaitable join handle.
179 ///
180 /// This is the primary method for scheduling async work on the executor.
181 /// The spawned task will be executed by one of the executor's worker threads.
182 ///
183 /// # Arguments
184 ///
185 /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
186 ///
187 /// # Returns
188 ///
189 /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
190 /// * `Err(SpawnError)`: Error if spawning fails
191 ///
192 /// # Example
193 ///
194 /// ```no_run
195 /// # use maniac::runtime::Executor;
196 /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
197 /// # let executor = Executor::new(
198 /// # TaskArenaConfig::new(1, 8).unwrap(),
199 /// # TaskArenaOptions::default(),
200 /// # 1,
201 /// # ).unwrap();
202 /// let handle = executor.spawn(async {
203 /// // Your async code here
204 /// 42
205 /// })?;
206 ///
207 /// // Await the result
208 /// // let result = handle.await;
209 /// # Ok::<(), maniac::runtime::task::SpawnError>(())
210 /// ```
211 #[track_caller]
212 pub fn spawn_blocking<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
213 where
214 F: IntoFuture<Output = T> + Send + 'static,
215 F::IntoFuture: Future<Output = T> + Send + 'static,
216 T: Send + 'static,
217 {
218 let location = Location::caller();
219 let type_id = TypeId::of::<F>();
220 self.blocking.inner.spawn(type_id, location, future)
221 }
222}
223
224/// Cooperative executor executor backed by `MmapExecutorArena` and worker threads.
225///
226/// The executor provides a high-performance async executor that uses:
227/// - Memory-mapped task arenas for efficient task storage
228/// - Worker threads for parallel task execution
229/// - Cooperative scheduling for optimal throughput
230///
231/// # Type Parameters
232///
233/// - `P`: Priority levels (default: 10)
234/// - `NUM_SEGS_P2`: Number of segments as a power of 2 (default: 6, meaning 2^6 = 64 segments)
235///
236/// # Example
237///
238/// ```no_run
239/// use maniac::runtime::Executor;
240/// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
241///
242/// let executor = Executor::new(
243/// TaskArenaConfig::new(1, 8).unwrap(),
244/// TaskArenaOptions::default(),
245/// 4, // worker_count
246/// ).unwrap();
247///
248/// let handle = executor.spawn(async {
249/// // Your async code here
250/// 42
251/// }).unwrap();
252///
253/// // Later, await the result
254/// // let result = handle.await;
255/// ```
256#[derive(Clone)]
257pub struct Executor<const P: usize = 10, const NUM_SEGS_P2: usize = 8> {
258 /// Internal executor state shared across all operations
259 inner: Arc<ExecutorInner<P, NUM_SEGS_P2>>,
260 /// Optionally owned tick service - if Some, this executor is responsible for shutting it down
261 owned_tick_service: Option<Arc<TickService>>,
262}
263
264pub type DefaultExecutor = Executor<10, 8>;
265pub type DefaultBlockingExecutor = Executor<8, 5>;
266
267impl<const P: usize, const NUM_SEGS_P2: usize> Executor<P, NUM_SEGS_P2> {
268 pub fn new_single_threaded() -> Self {
269 Self::new(
270 TaskArenaConfig::new(8, 4096).unwrap(),
271 TaskArenaOptions::new(false, false),
272 1,
273 1,
274 )
275 .unwrap()
276 }
277}
278
279/// Internal executor state shared between all executor operations.
280///
281/// This structure holds the core components needed for task spawning and execution:
282/// - `shutdown`: Atomic flag indicating whether the executor is shutting down
283/// - `tick_service`: Shared tick service for time-based operations
284/// - `service`: Worker service that manages task scheduling and execution
285struct ExecutorInner<const P: usize, const NUM_SEGS_P2: usize> {
286 /// Atomic flag indicating if the executor is shutting down.
287 /// Used to prevent new tasks from being spawned during shutdown.
288 shutdown: Arc<AtomicBool>,
289 /// Shared tick service for time-based operations across worker services.
290 tick_service: Arc<TickService>,
291 /// Worker service that manages task scheduling, worker threads, and task execution.
292 service: Arc<WorkerService<P, NUM_SEGS_P2>>,
293}
294
295impl<const P: usize, const NUM_SEGS_P2: usize> ExecutorInner<P, NUM_SEGS_P2> {
296 /// Spawns a new async task on the executor.
297 ///
298 /// This method performs the following steps:
299 /// 1. Validates that the executor is not shutting down and the arena is open
300 /// 2. Reserves a task slot from the worker service
301 /// 3. Initializes the task in the arena with its global ID
302 /// 4. Wraps the user's future in a join future that handles completion and cleanup
303 /// 5. Attaches the future to the task and schedules it for execution
304 ///
305 /// # Arguments
306 ///
307 /// * `future`: The future to execute. Must implement `IntoFuture` and be `Send + 'static`.
308 ///
309 /// # Returns
310 ///
311 /// * `Ok(JoinHandle<T>)`: A join handle that can be awaited to get the task's result
312 /// * `Err(SpawnError)`: Error if spawning fails (closed executor, no capacity, or attach failed)
313 ///
314 /// # Errors
315 ///
316 /// - `SpawnError::Closed`: Executor is shutting down or arena is closed
317 /// - `SpawnError::NoCapacity`: No available task slots in the arena
318 /// - `SpawnError::AttachFailed`: Failed to attach the future to the task
319 fn spawn<F, T>(
320 &self,
321 type_id: TypeId,
322 location: &'static Location,
323 future: F,
324 ) -> Result<JoinHandle<T>, SpawnError>
325 where
326 F: IntoFuture<Output = T> + Send + 'static,
327 F::IntoFuture: Future<Output = T> + Send + 'static,
328 T: Send + 'static,
329 {
330 let arena = self.service.arena();
331
332 // Early return if executor is shutting down or arena is closed
333 // This prevents spawning new tasks during shutdown
334 if self.shutdown.load(Ordering::Acquire) || arena.is_closed() {
335 return Err(SpawnError::Closed);
336 }
337
338 // Reserve a task slot from the worker service
339 // Returns None if no capacity is available
340 let Some(handle) = self.service.reserve_task() else {
341 return Err(if arena.is_closed() {
342 SpawnError::Closed
343 } else {
344 SpawnError::NoCapacity
345 });
346 };
347
348 // Calculate the global task ID and initialize the task in the arena
349 // The summary pointer is used for task tracking and statistics
350 let global_id = handle.global_id(arena.tasks_per_leaf());
351 arena.init_task(global_id, self.service.summary() as *const _);
352
353 // Get the task reference and prepare handles for cleanup
354 let task = handle.task();
355 let release_handle = TaskHandle::from_non_null(handle.as_non_null());
356 let service = Arc::clone(&self.service);
357
358 // Create shared state for the join handle
359 // This allows the spawned task to signal completion and the join handle to await it
360 let shared = Arc::new(JoinShared::new());
361 let shared_for_future = Arc::clone(&shared);
362 let release_handle_for_future = release_handle;
363 let service_for_future = Arc::clone(&self.service);
364
365 // Wrap the user's future in a join future that:
366 // 1. Awaits the user's future
367 // 2. Signals completion via JoinShared
368 // 3. Releases the task handle back to the worker service
369 let join_future = async move {
370 let result = future.into_future().await;
371 shared_for_future.complete(result);
372 service_for_future.release_task(release_handle_for_future);
373 };
374
375 // Allocate the future on the heap using the custom allocator
376 let future_ptr = FutureAllocator::box_future(join_future);
377
378 // Attach the future to the task
379 // If attachment fails, clean up the allocated future and release the task handle
380 if task.attach_future(future_ptr).is_err() {
381 unsafe { FutureAllocator::drop_boxed(future_ptr) };
382 self.service.release_task(release_handle);
383 return Err(SpawnError::AttachFailed);
384 }
385
386 // Schedule the task for execution by worker threads
387 task.schedule();
388
389 // Return a join handle that the caller can await
390 Ok(JoinHandle::new(shared))
391 }
392
393 /// Initiates shutdown of the executor.
394 ///
395 /// This method:
396 /// 1. Sets the shutdown flag atomically (idempotent - safe to call multiple times)
397 /// 2. Closes the task arena to prevent new tasks
398 /// 3. Signals the worker service to shut down
399 ///
400 /// After shutdown is initiated, no new tasks can be spawned, but existing tasks
401 /// will continue to run until they complete.
402 fn shutdown(&self) {
403 // Use swap to atomically set shutdown flag and check if it was already set
404 // Return early if shutdown was already initiated
405 if self.shutdown.swap(true, Ordering::Release) {
406 return;
407 }
408
409 // Close the arena to prevent new task initialization
410 self.service.arena().close();
411
412 // Signal worker service to begin shutdown process
413 self.service.shutdown();
414 }
415}
416
417pub struct ExecutorConfig {
418 task_arena: TaskArenaConfig,
419 task_arena_options: TaskArenaOptions,
420 min_workers: usize,
421 max_workers: usize,
422}
423
424impl ExecutorConfig {
425 pub fn new(
426 task_arena: TaskArenaConfig,
427 task_arena_options: TaskArenaOptions,
428 min_workers: usize,
429 max_workers: usize,
430 ) -> ExecutorConfig {
431 Self {
432 task_arena,
433 task_arena_options,
434 min_workers,
435 max_workers,
436 }
437 }
438
439 pub fn with_max_tasks(
440 min_workers: usize,
441 max_workers: usize,
442 max_tasks: usize,
443 ) -> io::Result<Self> {
444 let min_workers = min_workers.max(1);
445 let max_workers = max_workers.max(min_workers);
446 let max_tasks = max_tasks.max(max_workers);
447 let mut tasks_per_leaf = 4096;
448 let mut leaf_count = (max_tasks / tasks_per_leaf).max(1);
449
450 if leaf_count < max_workers {
451 leaf_count = max_workers;
452 tasks_per_leaf = (max_tasks / leaf_count).max(1);
453 }
454
455 Ok(Self {
456 task_arena: TaskArenaConfig::new(leaf_count, tasks_per_leaf)?,
457 task_arena_options: TaskArenaOptions::new(false, false),
458 min_workers,
459 max_workers,
460 })
461 }
462}
463
464impl<const P: usize, const NUM_SEGS_P2: usize> Executor<P, NUM_SEGS_P2> {
465 /// Creates a new executor instance with the specified configuration.
466 ///
467 /// This method initializes:
468 /// 1. A task arena with the given configuration and options
469 /// 2. A worker service with the specified number of worker threads
470 /// 3. Internal executor state for task management
471 ///
472 /// # Arguments
473 ///
474 /// * `config`: Configuration for the task arena (capacity, layout, etc.)
475 /// * `options`: Options for arena initialization (memory mapping, etc.)
476 /// * `worker_count`: Number of worker threads to spawn for task execution
477 ///
478 /// # Returns
479 ///
480 /// * `Ok(Executor)`: Successfully created executor instance
481 /// * `Err(io::Error)`: Error if arena initialization fails (e.g., memory mapping issues)
482 ///
483 /// # Example
484 ///
485 /// ```no_run
486 /// use maniac::runtime::Executor;
487 /// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
488 ///
489 /// let executor = Executor::new(
490 /// TaskArenaConfig::new(1, 8).unwrap(),
491 /// TaskArenaOptions::default(),
492 /// 4, // Use 4 worker threads
493 /// )?;
494 /// # Ok::<(), std::io::Error>(())
495 /// ```
496 pub fn new(
497 config: TaskArenaConfig,
498 options: TaskArenaOptions,
499 worker_count: usize,
500 max_worker_count: usize,
501 ) -> io::Result<Self> {
502 let worker_config = WorkerServiceConfig::default();
503 // Create shared tick service for time-based operations
504 // This allows multiple worker services to share a single tick thread
505 let tick_service = TickService::new(worker_config.tick_duration);
506 tick_service.start();
507
508 let mut executor = Self::new_with_tick_service(config, options, worker_count, max_worker_count, Arc::clone(&tick_service))?;
509 // Mark that this executor owns the tick service and is responsible for shutting it down
510 executor.owned_tick_service = Some(tick_service);
511 Ok(executor)
512 }
513
514 /// Creates a new executor instance with the specified configuration.
515 ///
516 /// This method initializes:
517 /// 1. A task arena with the given configuration and options
518 /// 2. A worker service with the specified number of worker threads
519 /// 3. Internal executor state for task management
520 ///
521 /// # Arguments
522 ///
523 /// * `config`: Configuration for the task arena (capacity, layout, etc.)
524 /// * `options`: Options for arena initialization (memory mapping, etc.)
525 /// * `worker_count`: Number of worker threads to spawn for task execution
526 ///
527 /// # Returns
528 ///
529 /// * `Ok(Executor)`: Successfully created executor instance
530 /// * `Err(io::Error)`: Error if arena initialization fails (e.g., memory mapping issues)
531 ///
532 /// # Example
533 ///
534 /// ```no_run
535 /// use maniac::runtime::Executor;
536 /// use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
537 ///
538 /// let executor = Executor::new(
539 /// TaskArenaConfig::new(1, 8).unwrap(),
540 /// TaskArenaOptions::default(),
541 /// 4, // Use 4 worker threads
542 /// )?;
543 /// # Ok::<(), std::io::Error>(())
544 /// ```
545 pub fn new_with_tick_service(
546 config: TaskArenaConfig,
547 options: TaskArenaOptions,
548 worker_count: usize,
549 max_worker_count: usize,
550 tick_service: Arc<TickService>,
551 ) -> io::Result<Self> {
552 let worker_count = worker_count.max(1);
553 let max_worker_count = max_worker_count.max(worker_count);
554
555 // Initialize the memory-mapped task arena with the provided configuration
556 let arena = TaskArena::with_config(config, options)?;
557
558 // Create worker service configuration
559 // Set min_workers to the desired count to ensure workers start immediately
560 // Set max_workers to at least the desired count (or CPU count, whichever is higher)
561 // This ensures workers start immediately on WorkerService::start()
562 let worker_config = WorkerServiceConfig {
563 min_workers: worker_count,
564 max_workers: max_worker_count,
565 ..WorkerServiceConfig::default()
566 };
567
568 // Start the worker service with the arena and configuration
569 // This spawns worker threads that will execute tasks
570 let service = WorkerService::<P, NUM_SEGS_P2>::start(arena, worker_config, &tick_service);
571
572 // Initialize shutdown flag to false (executor is active)
573 let shutdown = Arc::new(AtomicBool::new(false));
574
575 // Create internal executor state shared across all operations
576 let inner = Arc::new(ExecutorInner::<P, NUM_SEGS_P2> {
577 shutdown: Arc::clone(&shutdown),
578 tick_service,
579 service: Arc::clone(&service),
580 });
581
582 Ok(Self {
583 inner,
584 owned_tick_service: None, // Not owned when using an external tick service
585 })
586 }
587
588 /// Spawns an asynchronous task on the executor, returning an awaitable join handle.
589 ///
590 /// This is the primary method for scheduling async work on the executor.
591 /// The spawned task will be executed by one of the executor's worker threads.
592 ///
593 /// # Arguments
594 ///
595 /// * `future`: The future to execute. Can be any type that implements `IntoFuture`.
596 ///
597 /// # Returns
598 ///
599 /// * `Ok(JoinHandle<T>)`: A join handle that implements `Future<Output = T>`
600 /// * `Err(SpawnError)`: Error if spawning fails
601 ///
602 /// # Example
603 ///
604 /// ```no_run
605 /// # use maniac::runtime::Executor;
606 /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
607 /// # let executor = Executor::new(
608 /// # TaskArenaConfig::new(1, 8).unwrap(),
609 /// # TaskArenaOptions::default(),
610 /// # 1,
611 /// # ).unwrap();
612 /// let handle = executor.spawn(async {
613 /// // Your async code here
614 /// 42
615 /// })?;
616 ///
617 /// // Await the result
618 /// // let result = handle.await;
619 /// # Ok::<(), maniac::runtime::task::SpawnError>(())
620 /// ```
621 #[track_caller]
622 pub fn spawn<F, T>(&self, future: F) -> Result<JoinHandle<T>, SpawnError>
623 where
624 F: IntoFuture<Output = T> + Send + 'static,
625 F::IntoFuture: Future<Output = T> + Send + 'static,
626 T: Send + 'static,
627 {
628 let location = Location::caller();
629 let type_id = TypeId::of::<F>();
630 self.inner.spawn(type_id, location, future)
631 }
632
633 /// Returns current arena statistics.
634 ///
635 /// This provides insight into the executor's current state, including:
636 /// - Number of active tasks
637 /// - Task capacity and utilization
638 /// - Other arena-specific metrics
639 ///
640 /// # Returns
641 ///
642 /// `TaskArenaStats` containing current executor statistics
643 pub fn stats(&self) -> TaskArenaStats {
644 self.inner.service.arena().stats()
645 }
646
647 /// Returns a reference to the underlying worker service.
648 ///
649 /// This allows direct access to worker service operations such as:
650 /// - Interrupting workers for preemptive scheduling
651 /// - Accessing worker statistics and health information
652 /// - Managing worker threads dynamically
653 ///
654 /// # Returns
655 ///
656 /// An `Arc` to the `WorkerService` managing this executor's workers
657 pub fn service(&self) -> &Arc<WorkerService<P, NUM_SEGS_P2>> {
658 &self.inner.service
659 }
660}
661
662impl<const P: usize, const NUM_SEGS_P2: usize> Drop for Executor<P, NUM_SEGS_P2> {
663 /// Cleans up the executor when it goes out of scope.
664 ///
665 /// This implementation:
666 /// 1. Initiates shutdown of the executor (closes arena, signals workers)
667 /// 2. If this executor owns a tick service, shuts it down as well
668 /// 3. Unparks any worker threads that might be waiting
669 /// 4. Joins all worker threads to ensure clean shutdown
670 ///
671 /// Note: Currently, worker threads are managed by `WorkerService`, so the
672 /// `workers` vector is typically empty. This code is prepared for future
673 /// scenarios where we might manage worker threads directly.
674 fn drop(&mut self) {
675 // Initiate shutdown process (idempotent)
676 self.inner.shutdown();
677
678 // If this executor owns the tick service, shut it down
679 // This must happen after worker shutdown to avoid issues with the tick handler
680 if let Some(tick_service) = &self.owned_tick_service {
681 tick_service.shutdown();
682 }
683
684 // // Unpark any waiting worker threads to allow them to exit
685 // for worker in &self.workers {
686 // worker.thread().unpark();
687 // }
688 //
689 // // Wait for all worker threads to complete
690 // // drain(..) consumes the vector and returns an iterator
691 // for handle in self.workers.drain(..) {
692 // let _ = handle.join();
693 // }
694 }
695}
696
697/// Shared state between a spawned task and its join handle.
698///
699/// This structure enables communication between:
700/// - The spawned task (which calls `complete()` when done)
701/// - The join handle (which calls `poll()` to await completion)
702///
703/// It uses a lock-free fast path (atomic `ready` flag) with a fallback to
704/// mutex-protected state for storing the result and waker.
705struct JoinShared<T> {
706 /// Atomic flag indicating if the task has completed.
707 /// Used as a fast-path check before acquiring the mutex.
708 ready: AtomicBool,
709 /// Mutex-protected state containing the result and waker.
710 /// The mutex is only held briefly during result storage/retrieval.
711 state: Mutex<JoinSharedState<T>>,
712}
713
714/// Internal state protected by the mutex in `JoinShared`.
715struct JoinSharedState<T> {
716 /// The task's result, if it has completed.
717 result: Option<T>,
718 /// The waker to notify when the task completes.
719 /// Stored here so we can wake the awaiting future when completion occurs.
720 waker: Option<Waker>,
721}
722
723impl<T> JoinShared<T> {
724 /// Creates a new `JoinShared` instance in the initial (pending) state.
725 fn new() -> Self {
726 Self {
727 ready: AtomicBool::new(false),
728 state: Mutex::new(JoinSharedState {
729 result: None,
730 waker: None,
731 }),
732 }
733 }
734
735 /// Marks the task as complete and stores the result.
736 ///
737 /// This is called by the spawned task when it finishes execution.
738 /// If a waker is registered, it will be notified to wake the awaiting future.
739 ///
740 /// # Arguments
741 ///
742 /// * `value`: The result value from the completed task
743 ///
744 /// # Safety
745 ///
746 /// This method is idempotent - calling it multiple times is safe but only
747 /// the first call will have an effect.
748 fn complete(&self, value: T) {
749 let mut state = self.state.lock().unwrap();
750
751 // Idempotency check: if result already exists, don't overwrite it
752 if state.result.is_some() {
753 return;
754 }
755
756 // Store the result
757 state.result = Some(value);
758
759 // Set the ready flag atomically (Release ordering ensures previous writes are visible)
760 self.ready.store(true, Ordering::Release);
761
762 // Take the waker (if any) and drop the lock before waking
763 // This prevents holding the lock while executing waker code
764 let waker = state.waker.take();
765 drop(state);
766
767 // Wake the awaiting future if a waker was registered
768 if let Some(waker) = waker {
769 waker.wake();
770 }
771 }
772
773 /// Polls for the task's completion.
774 ///
775 /// This is called by the `JoinHandle` future's `poll` method.
776 /// Returns `Poll::Ready(T)` if the task has completed, or `Poll::Pending` if not.
777 ///
778 /// # Arguments
779 ///
780 /// * `cx`: The async context, used to register a waker for notification
781 ///
782 /// # Returns
783 ///
784 /// * `Poll::Ready(T)`: Task has completed, returns the result
785 /// * `Poll::Pending`: Task is still running, waker has been registered
786 fn poll(&self, cx: &mut Context<'_>) -> Poll<T> {
787 // Fast path: check atomic flag first (avoid mutex if already ready)
788 if self.ready.load(Ordering::Acquire) {
789 let mut state = self.state.lock().unwrap();
790 if let Some(value) = state.result.take() {
791 return Poll::Ready(value);
792 }
793 }
794
795 // Slow path: acquire lock and check again (handles race condition)
796 // Also register waker if task is still pending
797 let mut state = self.state.lock().unwrap();
798
799 // Double-check: result might have been set between the atomic check and lock acquisition
800 if let Some(value) = state.result.take() {
801 return Poll::Ready(value);
802 }
803
804 // Task is still pending: register the waker so we can be notified when it completes
805 state.waker = Some(cx.waker().clone());
806 Poll::Pending
807 }
808
809 /// Returns `true` if the task has completed.
810 ///
811 /// This is a non-blocking check that uses the atomic flag.
812 /// Note: The result may have already been taken by a previous poll.
813 fn is_finished(&self) -> bool {
814 self.ready.load(Ordering::Acquire)
815 }
816}
817
818/// Awaitable join handle returned from [`Executor::spawn`].
819///
820/// This handle implements `Future<Output = T>` and can be awaited to get the
821/// result of the spawned task. The handle can be cloned and shared, allowing
822/// multiple futures to await the same task.
823///
824/// # Example
825///
826/// ```no_run
827/// # use maniac::runtime::Executor;
828/// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
829/// # let executor = Executor::new(
830/// # TaskArenaConfig::new(1, 8).unwrap(),
831/// # TaskArenaOptions::default(),
832/// # 1,
833/// # ).unwrap();
834/// let handle = executor.spawn(async { 42 })?;
835///
836/// // Await the result
837/// let result = handle.await;
838/// # Ok::<(), maniac::runtime::task::SpawnError>(())
839/// ```
840pub struct JoinHandle<T> {
841 /// Shared state with the spawned task for synchronization
842 shared: Arc<JoinShared<T>>,
843}
844
845impl<T> JoinHandle<T> {
846 /// Creates a new join handle from shared state.
847 ///
848 /// This is an internal constructor used by `ExecutorInner::spawn`.
849 fn new(shared: Arc<JoinShared<T>>) -> Self {
850 Self { shared }
851 }
852
853 /// Returns `true` if the task has completed.
854 ///
855 /// This is a non-blocking check. Note that even if `is_finished()` returns
856 /// `true`, the result may have already been consumed by a previous await.
857 ///
858 /// # Example
859 ///
860 /// ```no_run
861 /// # use maniac::runtime::Executor;
862 /// # use maniac::runtime::task::{TaskArenaConfig, TaskArenaOptions};
863 /// # let executor = Executor::new(
864 /// # TaskArenaConfig::new(1, 8).unwrap(),
865 /// # TaskArenaOptions::default(),
866 /// # 1,
867 /// # ).unwrap();
868 /// let handle = executor.spawn(async { 42 })?;
869 ///
870 /// // Check if task is done (non-blocking)
871 /// if handle.is_finished() {
872 /// println!("Task completed!");
873 /// }
874 /// # Ok::<(), maniac::runtime::task::SpawnError>(())
875 /// ```
876 pub fn is_finished(&self) -> bool {
877 self.shared.is_finished()
878 }
879}
880
881impl<T: Send + 'static> Future for JoinHandle<T> {
882 type Output = T;
883
884 /// Polls the join handle for completion.
885 ///
886 /// This method:
887 /// - Returns `Poll::Ready(T)` if the task has completed
888 /// - Returns `Poll::Pending` if the task is still running
889 /// - Registers a waker to be notified when the task completes
890 ///
891 /// # Arguments
892 ///
893 /// * `cx`: The async context containing the waker
894 ///
895 /// # Returns
896 ///
897 /// * `Poll::Ready(T)`: Task completed, returns the result
898 /// * `Poll::Pending`: Task still running, will wake when done
899 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
900 self.shared.poll(cx)
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use crate::future::block_on;
908 use std::future::Future;
909 use std::sync::atomic::{AtomicUsize, Ordering};
910 use std::task::{Context, RawWaker, RawWakerVTable};
911 use std::time::Duration;
912
913 fn create_executor() -> DefaultExecutor {
914 DefaultExecutor::new_single_threaded()
915 }
916
917 /// Creates a no-op waker for testing purposes.
918 ///
919 /// This waker does nothing when woken, which is useful for testing
920 /// scenarios where we don't need actual async executor behavior.
921 fn noop_waker() -> Waker {
922 unsafe fn clone(_: *const ()) -> RawWaker {
923 RawWaker::new(std::ptr::null(), &VTABLE)
924 }
925 unsafe fn wake(_: *const ()) {}
926 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake, wake);
927 unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VTABLE)) }
928 }
929
930 /// Tests that a spawned task completes successfully and returns the correct result.
931 ///
932 /// This test:
933 /// 1. Creates a executor with a single worker thread
934 /// 2. Spawns a task that increments an atomic counter
935 /// 3. Awaits the task completion
936 /// 4. Verifies the result and that the counter was incremented
937 /// 5. Verifies that the task was properly cleaned up (active_tasks == 0)
938 #[test]
939 fn executor_spawn_completes() {
940 // Create a executor with minimal configuration for testing
941 let executor = create_executor();
942
943 // Create a shared counter to verify task execution
944 let counter = Arc::new(AtomicUsize::new(0));
945 let cloned = counter.clone();
946
947 // Spawn a task that increments the counter and returns the new value
948 let join = executor
949 .spawn(async move { cloned.fetch_add(1, Ordering::Relaxed) + 1 })
950 .expect("spawn");
951
952 // Block until the task completes and get the result
953 let result = block_on(join);
954
955 // Verify the result is correct
956 assert_eq!(result, 1);
957 // Verify the counter was actually incremented
958 assert_eq!(counter.load(Ordering::Relaxed), 1);
959 // Verify the task was cleaned up after completion
960 assert_eq!(executor.stats().active_tasks, 0);
961 }
962
963 /// Tests that `JoinHandle::is_finished()` correctly reports task completion status.
964 ///
965 /// This test:
966 /// 1. Creates a executor and spawns a simple task
967 /// 2. Verifies `is_finished()` returns `false` before completion
968 /// 3. Awaits the task to completion
969 /// 4. Verifies that the task was properly cleaned up
970 #[test]
971 fn join_handle_reports_completion() {
972 // Create a executor with minimal configuration
973 let executor = create_executor();
974
975 // Spawn an empty task (just completes immediately)
976 let join = executor.spawn(async {}).expect("spawn");
977
978 // Verify task is not finished immediately after spawning
979 // (though it may complete very quickly, this check happens before awaiting)
980 assert!(!join.is_finished());
981
982 // Await the task to completion
983 block_on(join);
984
985 // Verify the task was cleaned up after completion
986 assert_eq!(executor.stats().active_tasks, 0);
987 }
988}