maniac_runtime/runtime/
task.rs

1use super::summary::Summary;
2use super::timer::TimerHandle;
3use super::worker::Worker;
4use crate::utils::bits;
5use std::cell::UnsafeCell;
6use std::fmt;
7use std::future::{Future, IntoFuture};
8use std::io;
9use std::pin::Pin;
10use std::ptr::{self, NonNull};
11use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, AtomicU64, Ordering};
12use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
13use std::time::{Duration, Instant};
14
15#[cfg(unix)]
16use libc::{MAP_ANONYMOUS, MAP_FAILED, MAP_PRIVATE, PROT_READ, PROT_WRITE, mmap, munmap};
17
18#[cfg(target_os = "linux")]
19use libc::{MAP_HUGE_2MB, MAP_HUGETLB};
20
21#[cfg(windows)]
22use winapi::um::memoryapi::{VirtualAlloc, VirtualFree};
23#[cfg(windows)]
24use winapi::um::winnt::{MEM_COMMIT, MEM_LARGE_PAGES, MEM_RELEASE, MEM_RESERVE, PAGE_READWRITE};
25
26pub type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
27
28pub const TASK_IDLE: u8 = 0;
29pub const TASK_SCHEDULED: u8 = 1;
30pub const TASK_EXECUTING: u8 = 2;
31pub const TASK_SCHEDULED_AND_EXECUTING: u8 = 3;
32
33#[derive(Clone, Copy, Debug)]
34pub struct TaskArenaOptions {
35    pub use_huge_pages: bool,
36    pub preinitialize_tasks: bool,
37}
38
39impl Default for TaskArenaOptions {
40    fn default() -> Self {
41        Self {
42            use_huge_pages: false,
43            preinitialize_tasks: false,
44        }
45    }
46}
47
48impl TaskArenaOptions {
49    pub fn new(use_huge_pages: bool, preinitialize_tasks: bool) -> Self {
50        Self {
51            use_huge_pages,
52            preinitialize_tasks,
53        }
54    }
55
56    pub fn with_huge_pages(mut self, huge_pages: bool) -> Self {
57        self.use_huge_pages = huge_pages;
58        self
59    }
60
61    pub fn with_preinitialized_tasks(mut self, enabled: bool) -> Self {
62        self.preinitialize_tasks = enabled;
63        self
64    }
65}
66
67#[derive(Clone, Copy, Debug)]
68pub struct TaskArenaConfig {
69    pub leaf_count: usize,
70    pub tasks_per_leaf: usize,
71    pub max_workers: usize,
72}
73
74impl TaskArenaConfig {
75    pub fn new(leaf_count: usize, tasks_per_leaf: usize) -> io::Result<Self> {
76        let leaf_count = if !leaf_count.is_power_of_two() {
77            leaf_count.next_power_of_two()
78        } else {
79            leaf_count
80        };
81        // let tasks_per_leaf = if !tasks_per_leaf.is_power_of_two() {
82        //     tasks_per_leaf.next_power_of_two()
83        // } else {
84        //     tasks_per_leaf
85        // };
86        if leaf_count == 0 || tasks_per_leaf == 0 {
87            return Err(io::Error::new(
88                io::ErrorKind::InvalidInput,
89                "leaf_count and tasks_per_leaf must be > 0",
90            ));
91        }
92        Ok(Self {
93            leaf_count,
94            tasks_per_leaf,
95            max_workers: leaf_count,
96        })
97    }
98
99    pub fn with_max_workers(mut self, workers: usize) -> Self {
100        self.max_workers = workers.max(1).min(self.leaf_count);
101        self
102    }
103}
104
105#[repr(C)]
106#[derive(Debug)]
107pub struct TaskSignal {
108    value: AtomicU64,
109}
110
111impl TaskSignal {
112    pub const fn new() -> Self {
113        Self {
114            value: AtomicU64::new(0),
115        }
116    }
117
118    #[inline]
119    pub fn load(&self, ordering: Ordering) -> u64 {
120        self.value.load(ordering)
121    }
122
123    #[inline(always)]
124    pub fn is_set(&self, bit_index: u64) -> bool {
125        let mask = 1u64 << bit_index;
126        (self.value.load(Ordering::Relaxed) & mask) != 0
127    }
128
129    #[inline(always)]
130    pub fn set(&self, bit_index: u64) -> (bool, bool) {
131        let mask = 1u64 << bit_index;
132        let prev = self.value.fetch_or(mask, Ordering::AcqRel);
133        // was empty; was_set
134        (prev == 0, (prev & mask) == 0)
135    }
136
137    #[inline]
138    pub fn clear(&self, bit_index: u64) -> (u64, bool) {
139        let mask = 1u64 << bit_index;
140        let previous = self.value.fetch_and(!mask, Ordering::AcqRel);
141        let remaining = previous & !mask;
142        (remaining, remaining == 0)
143    }
144
145    #[inline]
146    pub fn try_acquire(&self, bit_index: u64) -> (u64, bool) {
147        if !self.is_set(bit_index) {
148            return (0, false);
149        }
150        let mask = 1u64 << bit_index;
151        let (_, previous, acquired) = bits::try_acquire(&self.value, bit_index as u64);
152        let remaining = previous & !mask;
153        (remaining, acquired)
154    }
155
156    #[inline]
157    pub fn try_acquire_from(&self, start_bit: u64) -> Option<(u32, u64)> {
158        let start = (start_bit as u64).min(63);
159        for _ in 0..64 {
160            let current = self.value.load(Ordering::Acquire);
161            if current == 0 {
162                return None;
163            }
164
165            let candidate = bits::find_nearest(current, start);
166            let bit_index = if candidate < 64 {
167                candidate as u32
168            } else {
169                current.trailing_zeros()
170            };
171
172            let (bit_mask, previous, acquired) = bits::try_acquire(&self.value, bit_index as u64);
173            if !acquired {
174                std::hint::spin_loop();
175                continue;
176            }
177
178            let remaining = previous & !bit_mask;
179            return Some((bit_index, remaining));
180        }
181        None
182    }
183}
184
185#[derive(Clone, Copy, Debug)]
186pub struct TaskHandle(NonNull<Task>);
187
188impl TaskHandle {
189    #[inline(always)]
190    pub fn from_task(task: &Task) -> Self {
191        TaskHandle(NonNull::from(task))
192    }
193
194    #[inline(always)]
195    pub fn from_non_null(task: NonNull<Task>) -> Self {
196        TaskHandle(task)
197    }
198
199    #[inline(always)]
200    pub fn as_ptr(&self) -> *mut Task {
201        self.0.as_ptr()
202    }
203
204    #[inline(always)]
205    pub fn as_non_null(&self) -> NonNull<Task> {
206        self.0
207    }
208
209    #[inline(always)]
210    pub fn task(&self) -> &Task {
211        unsafe { self.0.as_ref() }
212    }
213
214    #[inline(always)]
215    pub fn leaf_idx(&self) -> usize {
216        self.task().leaf_idx as usize
217    }
218
219    #[inline(always)]
220    pub fn signal_idx(&self) -> usize {
221        self.task().signal_idx as usize
222    }
223
224    #[inline(always)]
225    pub fn bit_idx(&self) -> u8 {
226        self.task().signal_bit
227    }
228
229    #[inline(always)]
230    pub fn global_id(&self, _tasks_per_leaf: usize) -> u32 {
231        self.task().global_id()
232    }
233}
234
235unsafe impl Send for TaskHandle {}
236unsafe impl Sync for TaskHandle {}
237
238#[repr(C)]
239pub struct TaskSlot {
240    task_ptr: AtomicPtr<Task>,
241    active_task_ptr: AtomicPtr<Task>,
242}
243
244impl TaskSlot {
245    #[inline(always)]
246    pub fn new(task_ptr: *mut Task) -> Self {
247        Self {
248            task_ptr: AtomicPtr::new(task_ptr),
249            active_task_ptr: AtomicPtr::new(ptr::null_mut()),
250        }
251    }
252
253    #[inline(always)]
254    pub fn task_ptr(&self) -> *mut Task {
255        self.task_ptr.load(Ordering::Acquire)
256    }
257
258    #[inline(always)]
259    pub fn set_task_ptr(&self, ptr: *mut Task) {
260        self.task_ptr.store(ptr, Ordering::Release);
261    }
262
263    #[inline(always)]
264    pub fn task_ptr_compare_exchange(
265        &self,
266        current: *mut Task,
267        new: *mut Task,
268    ) -> Result<*mut Task, *mut Task> {
269        self.task_ptr
270            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
271    }
272
273    #[inline(always)]
274    pub fn clear_task_ptr(&self) {
275        self.task_ptr.store(ptr::null_mut(), Ordering::Release);
276        self.active_task_ptr
277            .store(ptr::null_mut(), Ordering::Release);
278    }
279
280    #[inline(always)]
281    pub fn set_active_task_ptr(&self, ptr: *mut Task) {
282        self.active_task_ptr.store(ptr, Ordering::Release);
283    }
284
285    #[inline(always)]
286    pub fn active_task_ptr(&self) -> *mut Task {
287        self.active_task_ptr.load(Ordering::Acquire)
288    }
289
290    #[inline(always)]
291    pub fn clear_active_task_ptr(&self) {
292        self.active_task_ptr
293            .store(ptr::null_mut(), Ordering::Release);
294    }
295}
296
297#[derive(Debug)]
298pub struct ArenaLayout {
299    task_slot_offset: usize,
300    task_offset: usize,
301    total_size: usize,
302    pub signals_per_leaf: usize,
303}
304
305impl ArenaLayout {
306    fn new(config: &TaskArenaConfig) -> Self {
307        let signals_per_leaf = (config.tasks_per_leaf + 63) / 64;
308
309        // Only allocate task slots and tasks in mmap
310        let task_slot_size =
311            config.leaf_count * config.tasks_per_leaf * std::mem::size_of::<TaskSlot>();
312        let task_size = config.leaf_count * config.tasks_per_leaf * std::mem::size_of::<Task>();
313
314        let mut offset = 0usize;
315        let task_slot_offset = offset;
316        offset += task_slot_size;
317        let task_offset = offset;
318        offset += task_size;
319
320        Self {
321            task_slot_offset,
322            task_offset,
323            total_size: offset,
324            signals_per_leaf,
325        }
326    }
327}
328
329pub(crate) struct FutureAllocator;
330
331impl FutureAllocator {
332    #[inline(always)]
333    pub fn box_future<F>(future: F) -> *mut ()
334    where
335        F: Future<Output = ()> + Send + 'static,
336    {
337        let boxed: BoxFuture = Box::pin(future);
338        Box::into_raw(Box::new(boxed)) as *mut ()
339    }
340
341    #[inline(always)]
342    pub unsafe fn drop_boxed(ptr: *mut ()) {
343        if ptr.is_null() {
344            return;
345        }
346        unsafe {
347            drop(Box::from_raw(ptr as *mut BoxFuture));
348        }
349    }
350
351    #[inline(always)]
352    pub unsafe fn poll_boxed(ptr: *mut (), cx: &mut Context<'_>) -> Option<Poll<()>> {
353        if ptr.is_null() {
354            return None;
355        }
356        unsafe {
357            let future = &mut *(ptr as *mut BoxFuture);
358            Some(future.as_mut().poll(cx))
359        }
360    }
361}
362
363#[repr(C)]
364#[derive(Debug, Default, Clone, Copy)]
365pub struct TaskStats {
366    pub polls: u32,
367    pub yields: u32,
368    pub cpu_time_ns: u64,
369}
370
371impl TaskStats {
372    #[inline(always)]
373    pub fn reset(&mut self) {
374        self.polls = 0;
375        self.yields = 0;
376        self.cpu_time_ns = 0;
377    }
378}
379
380#[repr(C)]
381pub struct Task {
382    global_id: u32,
383    leaf_idx: u16,
384    signal_idx: u8,
385    signal_bit: u8,
386    state: AtomicU8,
387    yielded: AtomicBool,
388    cpu_time_enabled: AtomicBool,
389    signal_ptr: *const TaskSignal,
390    slot_ptr: AtomicPtr<TaskSlot>,
391    summary_tree_ptr: *const Summary,
392    future_ptr: AtomicPtr<()>,
393    // Generator pinning support: when a task pins a generator, it gets dedicated execution
394    pinned_generator_ptr: AtomicPtr<()>,
395    // Generator run mode: 0 = None, 1 = Switch (preempted), 2 = Poll (explicit/loop)
396    generator_run_mode: AtomicU8,
397    // Safety: stats are mutated without synchronization based on executor guarantees that
398    // only the owning worker thread records updates while other threads may only clone/copy.
399    stats: UnsafeCell<TaskStats>,
400}
401
402unsafe impl Send for Task {}
403unsafe impl Sync for Task {}
404
405/// Generator run modes
406#[repr(u8)]
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408pub enum GeneratorRunMode {
409    /// No generator pinned
410    None = 0,
411    /// Switch mode: Generator has interrupted poll on stack (one-shot resume)
412    Switch = 1,
413    /// Poll mode: Generator contains poll loop (multi-shot resume)
414    Poll = 2,
415}
416
417impl Task {
418    const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
419        Self::waker_clone,
420        Self::waker_wake,
421        Self::waker_wake_by_ref,
422        Self::waker_drop,
423    );
424
425    const WAKER_YIELD_VTABLE: RawWakerVTable = RawWakerVTable::new(
426        Self::waker_clone,
427        Self::waker_yield_wake,
428        Self::waker_yield_wake_by_ref,
429        Self::waker_drop,
430    );
431
432    unsafe fn construct(
433        ptr: *mut Task,
434        global_id: u32,
435        leaf_idx: u16,
436        signal_idx: u8,
437        signal_bit: u8,
438        signal_ptr: *const TaskSignal,
439        slot_ptr: *mut TaskSlot,
440    ) {
441        unsafe {
442            ptr::write(
443                ptr,
444                Task {
445                    global_id,
446                    leaf_idx,
447                    signal_idx,
448                    signal_bit,
449                    signal_ptr,
450                    slot_ptr: AtomicPtr::new(slot_ptr),
451                    summary_tree_ptr: ptr::null(),
452                    state: AtomicU8::new(TASK_IDLE),
453                    yielded: AtomicBool::new(false),
454                    cpu_time_enabled: AtomicBool::new(false),
455                    future_ptr: AtomicPtr::new(ptr::null_mut()),
456                    pinned_generator_ptr: AtomicPtr::new(ptr::null_mut()),
457                    generator_run_mode: AtomicU8::new(GeneratorRunMode::None as u8),
458                    stats: UnsafeCell::new(TaskStats::default()),
459                },
460            );
461            (*slot_ptr).set_task_ptr(ptr);
462        }
463    }
464
465    /// Bind this task to a SummaryTree for signaling when it becomes runnable.
466    ///
467    /// # Safety
468    /// The summary_tree pointer must remain valid for the lifetime of this task.
469    /// This must only be called once during initialization.
470    #[inline]
471    unsafe fn bind_summary_tree(&mut self, summary_tree: *const Summary) {
472        self.summary_tree_ptr = summary_tree;
473    }
474
475    pub fn global_id(&self) -> u32 {
476        self.global_id
477    }
478
479    #[inline(always)]
480    pub fn leaf_idx(&self) -> u16 {
481        self.leaf_idx
482    }
483
484    #[inline(always)]
485    pub fn signal_idx(&self) -> u8 {
486        self.signal_idx
487    }
488
489    #[inline(always)]
490    pub fn signal_bit(&self) -> u8 {
491        self.signal_bit
492    }
493
494    #[inline(always)]
495    pub fn stats(&self) -> TaskStats {
496        unsafe { *self.stats.get() }
497    }
498
499    #[inline(always)]
500    pub fn set_cpu_time_tracking(&self, enabled: bool) {
501        self.cpu_time_enabled.store(enabled, Ordering::Relaxed);
502    }
503
504    #[inline(always)]
505    fn record_poll(&self) {
506        unsafe {
507            let stats = &mut *self.stats.get();
508            stats.polls = stats.polls.saturating_add(1);
509        }
510    }
511
512    #[inline(always)]
513    pub(crate) fn record_yield(&self) {
514        unsafe {
515            let stats = &mut *self.stats.get();
516            stats.yields = stats.yields.saturating_add(1);
517        }
518    }
519
520    #[inline(always)]
521    fn record_cpu_time(&self, duration: Duration) {
522        let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
523        unsafe {
524            let stats = &mut *self.stats.get();
525            stats.cpu_time_ns = stats.cpu_time_ns.saturating_add(nanos);
526        }
527    }
528
529    #[inline(always)]
530    pub fn slot(&self) -> Option<NonNull<TaskSlot>> {
531        NonNull::new(self.slot_ptr.load(Ordering::Acquire))
532    }
533
534    #[inline(always)]
535    pub fn clear_slot(&self) {
536        self.slot_ptr.store(ptr::null_mut(), Ordering::Release);
537    }
538
539    #[inline(always)]
540    pub fn state(&self) -> &AtomicU8 {
541        &self.state
542    }
543
544    /// Attempts to schedule this queue for execution (IDLE -> SCHEDULED transition).
545    ///
546    /// Called by producers after enqueuing items to notify the executor. Uses atomic
547    /// operations to ensure only one successful schedule per work batch.
548    ///
549    /// # Algorithm
550    ///
551    /// 1. **Fast check**: If already SCHEDULED, return false immediately (idempotent)
552    /// 2. **Atomic set**: `fetch_or(SCHEDULED)` to set the SCHEDULED flag
553    /// 3. **State check**: If previous state was IDLE (neither SCHEDULED nor EXECUTING):
554    ///    - Set bit in signal word via `signal.set(bit_index)`
555    ///    - If signal transitioned from empty, update summary via `waker.mark_active()`
556    ///    - Return true (successful schedule)
557    /// 4. **Otherwise**: Return false (already scheduled or executing)
558    ///
559    /// # Returns
560    ///
561    /// - `true`: Successfully transitioned from IDLE to SCHEDULED (work will be processed)
562    /// - `false`: Already scheduled/executing, or concurrent schedule won (idempotent)
563    ///
564    /// # Concurrent Behavior
565    ///
566    /// - **Multiple producers**: Only the first `schedule()` succeeds (returns true)
567    /// - **During EXECUTING**: Sets SCHEDULED flag, which `finish()` will detect and reschedule
568    ///
569    /// # Memory Ordering
570    ///
571    /// - Initial load: `Acquire` (see latest state)
572    /// - `fetch_or`: `Release` (publish enqueued items to executor)
573    ///
574    /// # Performance
575    ///
576    /// - **Already scheduled**: ~2-3 ns (fast path, single atomic load)
577    /// - **Successful schedule**: ~10-20 ns (fetch_or + signal update + potential summary update)
578    ///
579    /// # Example
580    ///
581    /// ```ignore
582    /// // Producer 1
583    /// queue.try_push(item)?;
584    /// if gate.schedule() {
585    ///     println!("Successfully scheduled");  // First producer
586    /// }
587    ///
588    /// // Producer 2 (concurrent)
589    /// queue.try_push(another_item)?;
590    /// if !gate.schedule() {
591    ///     println!("Already scheduled");  // Idempotent, no action needed
592    /// }
593    /// ```
594    #[inline(always)]
595    pub fn schedule(&self) {
596        if (self.state.load(Ordering::Acquire) & TASK_SCHEDULED) != TASK_IDLE {
597            return;
598        }
599
600        let previous_flags = self.state.fetch_or(TASK_SCHEDULED, Ordering::Release);
601        let scheduled_nor_executing =
602            (previous_flags & (TASK_SCHEDULED | TASK_EXECUTING)) == TASK_IDLE;
603
604        if scheduled_nor_executing {
605            let signal = unsafe { &*self.signal_ptr };
606            let (_was_empty, was_set) = signal.set(self.signal_bit as u64);
607            if was_set && !self.summary_tree_ptr.is_null() {
608                unsafe {
609                    (*self.summary_tree_ptr)
610                        .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
611                }
612            }
613        }
614    }
615
616    #[inline(always)]
617    pub(crate) fn try_begin_inline(&self) -> Result<(), u8> {
618        self.state
619            .compare_exchange(
620                TASK_IDLE,
621                TASK_EXECUTING,
622                Ordering::AcqRel,
623                Ordering::Acquire,
624            )
625            .map(|_| ())
626            .map_err(|current| current)
627    }
628
629    /// Marks the queue as EXECUTING (SCHEDULED -> EXECUTING transition).
630    ///
631    /// Called by the executor when it begins processing this queue. This transition
632    /// prevents redundant scheduling while work is being processed.
633    ///
634    /// # State Transition
635    ///
636    /// Unconditionally stores EXECUTING, which clears any SCHEDULED flags and sets EXECUTING.
637    /// ```text
638    /// Before: SCHEDULED (1)
639    /// After:  EXECUTING (2)
640    /// ```
641    ///
642    /// If a producer calls `schedule()` after `begin()` but before `finish()`, the
643    /// SCHEDULED flag will be set again (creating state 3 = EXECUTING | SCHEDULED),
644    /// which `finish()` detects and handles.
645    ///
646    /// # Memory Ordering
647    ///
648    /// Uses `Ordering::Release` to ensure the state change is visible to concurrent
649    /// producers calling `schedule()`.
650    ///
651    /// # Performance
652    ///
653    /// ~1-2 ns (single atomic store)
654    ///
655    /// # Example
656    ///
657    /// ```ignore
658    /// // Executor discovers ready queue
659    /// if signal.acquire(queue_bit) {
660    ///     gate.begin();  // Mark as executing
661    ///     process_queue();
662    ///     gate.finish();
663    /// }
664    /// ```
665    #[inline(always)]
666    pub(crate) fn begin(&self) {
667        self.state.store(TASK_EXECUTING, Ordering::Release);
668    }
669
670    /// Marks the queue as IDLE and handles concurrent schedules (EXECUTING -> IDLE/SCHEDULED).
671    ///
672    /// Called by the executor after processing a batch of items. Automatically detects
673    /// if new work arrived during processing (SCHEDULED flag set concurrently) and
674    /// reschedules if needed.
675    ///
676    /// # Algorithm
677    ///
678    /// 1. **Clear EXECUTING**: `fetch_sub(EXECUTING)` atomically transitions to IDLE
679    /// 2. **Check SCHEDULED**: If the SCHEDULED flag is set in the result:
680    ///    - Means a producer called `schedule()` during execution
681    ///    - Re-set the signal bit to ensure executor sees the work
682    ///    - Queue remains/becomes SCHEDULED
683    ///
684    /// # Automatic Rescheduling
685    ///
686    /// This method implements a key correctness property: if a producer enqueues work
687    /// while the executor is processing, that work will not be lost. The SCHEDULED flag
688    /// acts as a handoff mechanism.
689    ///
690    /// ```text
691    /// Timeline:
692    /// T0: Executor calls begin()           -> EXECUTING (2)
693    /// T1: Producer calls schedule()        -> EXECUTING | SCHEDULED (3)
694    /// T2: Executor calls finish()          -> SCHEDULED (1) [bit re-set in signal]
695    /// T3: Executor sees bit, processes     -> ...
696    /// ```
697    ///
698    /// # Memory Ordering
699    ///
700    /// Uses `Ordering::AcqRel`:
701    /// - **Acquire**: See all producer writes (enqueued items)
702    /// - **Release**: Publish state transition to future readers
703    ///
704    /// # Performance
705    ///
706    /// - **No concurrent schedule**: ~2-3 ns (fetch_sub only)
707    /// - **With concurrent schedule**: ~10-15 ns (fetch_sub + signal.set)
708    ///
709    /// # Example
710    ///
711    /// ```ignore
712    /// gate.begin();
713    /// while let Some(item) = queue.try_pop() {
714    ///     process(item);
715    /// }
716    /// gate.finish();  // Automatically reschedules if more work arrived
717    /// ```
718    #[inline(always)]
719    pub(crate) fn finish(&self) {
720        let after_flags = self.state.fetch_sub(TASK_EXECUTING, Ordering::AcqRel);
721        if after_flags & TASK_SCHEDULED != TASK_IDLE {
722            let signal = unsafe { &*self.signal_ptr };
723            let (_was_empty, was_set) = signal.set(self.signal_bit as u64);
724            if was_set && !self.summary_tree_ptr.is_null() {
725                unsafe {
726                    (*self.summary_tree_ptr)
727                        .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
728                }
729            }
730        }
731    }
732
733    /// Atomically marks the queue as SCHEDULED, ensuring re-execution.
734    ///
735    /// Called by the executor when it knows more work exists but wants to yield the
736    /// timeslice for fairness. This is an optimization over `finish()` followed by
737    /// external `schedule()`.
738    ///
739    /// # Use Cases
740    ///
741    /// 1. **Batch size limiting**: Process N items, then yield to other queues
742    /// 2. **Fairness**: Prevent queue starvation by rotating execution
743    /// 3. **Latency control**: Ensure all queues get regular timeslices
744    ///
745    /// # Algorithm
746    ///
747    /// 1. **Set state**: Store SCHEDULED unconditionally
748    /// 2. **Update signal**: Set bit in signal word
749    /// 3. **Update summary**: If signal was empty, mark active in waker
750    ///
751    /// # Comparison with finish() + schedule()
752    ///
753    /// ```ignore
754    /// // Separate calls (2 atomic ops)
755    /// gate.finish();      // EXECUTING -> IDLE
756    /// gate.schedule();    // IDLE -> SCHEDULED
757    ///
758    /// // Combined call (1 atomic op + signal update)
759    /// gate.finish_and_schedule();  // EXECUTING -> SCHEDULED
760    /// ```
761    ///
762    /// # Memory Ordering
763    ///
764    /// Uses `Ordering::Release` to publish both the state change and enqueued items.
765    ///
766    /// # Performance
767    ///
768    /// ~10-15 ns (store + signal.set + potential summary update)
769    ///
770    /// # Example
771    ///
772    /// ```ignore
773    /// gate.begin();
774    /// let mut processed = 0;
775    /// while processed < BATCH_SIZE {
776    ///     if let Some(item) = queue.try_pop() {
777    ///         process(item);
778    ///         processed += 1;
779    ///     } else {
780    ///         break;
781    ///     }
782    /// }
783    ///
784    /// if queue.len() > 0 {
785    ///     gate.finish_and_schedule();  // More work, stay scheduled
786    /// } else {
787    ///     gate.finish();  // Done, go idle
788    /// }
789    /// ```
790    #[inline(always)]
791    pub(crate) fn finish_and_schedule(&self) {
792        self.state.store(TASK_SCHEDULED, Ordering::Release);
793        let signal = unsafe { &*self.signal_ptr };
794        let (was_empty, was_set) = signal.set(self.signal_bit as u64);
795        if was_empty && was_set && !self.summary_tree_ptr.is_null() {
796            unsafe {
797                (*self.summary_tree_ptr)
798                    .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
799            }
800        }
801    }
802
803    #[inline(always)]
804    pub(crate) fn clear_yielded(&self) {
805        self.yielded.store(false, Ordering::Relaxed);
806    }
807    
808    /// Mark this task as yielded (for preemption or cooperative yields)
809    #[inline(always)]
810    pub(crate) fn mark_yielded(&self) {
811        self.yielded.store(true, Ordering::Relaxed);
812    }
813
814    #[inline(always)]
815    pub fn is_yielded(&self) -> bool {
816        self.yielded.load(Ordering::Relaxed)
817    }
818
819    #[inline(always)]
820    pub unsafe fn waker_yield(&self) -> Waker {
821        let slot_ptr = self.slot_ptr.load(Ordering::Acquire);
822        debug_assert!(!slot_ptr.is_null(), "task is missing slot pointer");
823        let ptr = slot_ptr as *const ();
824        unsafe { Waker::from_raw(RawWaker::new(ptr, &Self::WAKER_YIELD_VTABLE)) }
825    }
826
827    #[inline(always)]
828    unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
829        RawWaker::new(ptr, &Self::WAKER_VTABLE)
830    }
831
832    #[inline(always)]
833    unsafe fn waker_yield_wake(ptr: *const ()) {
834        let slot = unsafe { &*(ptr as *const TaskSlot) };
835        let task_ptr = slot.task_ptr();
836        if task_ptr.is_null() {
837            return;
838        }
839        let task = unsafe { &*task_ptr };
840        task.yielded.store(true, Ordering::Relaxed);
841    }
842
843    #[inline(always)]
844    unsafe fn waker_yield_wake_by_ref(ptr: *const ()) {
845        let slot = unsafe { &*(ptr as *const TaskSlot) };
846        let task_ptr = slot.task_ptr();
847        if task_ptr.is_null() {
848            return;
849        }
850        let task = unsafe { &*task_ptr };
851        task.yielded.store(true, Ordering::Relaxed);
852    }
853
854    #[inline(always)]
855    unsafe fn waker_wake(ptr: *const ()) {
856        let slot = unsafe { &*(ptr as *const TaskSlot) };
857        let task_ptr = slot.task_ptr();
858        if task_ptr.is_null() {
859            return;
860        }
861        let task = unsafe { &*task_ptr };
862        task.schedule();
863    }
864
865    #[inline(always)]
866    unsafe fn waker_wake_by_ref(ptr: *const ()) {
867        let slot = unsafe { &*(ptr as *const TaskSlot) };
868        let task_ptr = slot.task_ptr();
869        if task_ptr.is_null() {
870            return;
871        }
872        let task = unsafe { &*task_ptr };
873        task.schedule();
874    }
875
876    #[inline(always)]
877    unsafe fn waker_drop(_: *const ()) {}
878
879    #[inline(always)]
880    pub unsafe fn poll_future(&self, cx: &mut Context<'_>) -> Option<Poll<()>> {
881        let ptr = self.future_ptr.load(Ordering::Acquire);
882        if ptr.is_null() {
883            return None;
884        }
885        self.record_poll();
886        if self.cpu_time_enabled.load(Ordering::Relaxed) {
887            let start = Instant::now();
888            let result = unsafe { FutureAllocator::poll_boxed(ptr, cx) };
889            self.record_cpu_time(start.elapsed());
890            result
891        } else {
892            unsafe { FutureAllocator::poll_boxed(ptr, cx) }
893        }
894    }
895
896    #[inline(always)]
897    pub fn attach_future(&self, future_ptr: *mut ()) -> Result<(), *mut ()> {
898        self.future_ptr
899            .compare_exchange(
900                ptr::null_mut(),
901                future_ptr,
902                Ordering::AcqRel,
903                Ordering::Acquire,
904            )
905            .map(|_| ())
906            .map_err(|existing| existing)
907    }
908
909    #[inline(always)]
910    pub fn take_future(&self) -> Option<*mut ()> {
911        let ptr = self.future_ptr.swap(ptr::null_mut(), Ordering::AcqRel);
912        if ptr.is_null() { None } else { Some(ptr) }
913    }
914
915    // pub unsafe fn reset(&self) {
916    //     self.state.store(TASK_IDLE, Ordering::Relaxed);
917    //     self.yielded.store(false, Ordering::Relaxed);
918    //     self.future_ptr.store(ptr::null_mut(), Ordering::Relaxed);
919    // }
920
921    #[inline(always)]
922    pub unsafe fn reset(
923        &mut self,
924        global_id: u32,
925        leaf_idx: u16,
926        signal_idx: u8,
927        signal_bit: u8,
928        signal_ptr: *const TaskSignal,
929        slot_ptr: *mut TaskSlot,
930    ) {
931        self.global_id = global_id;
932        self.leaf_idx = leaf_idx;
933        self.signal_idx = signal_idx;
934        self.signal_bit = signal_bit;
935        self.signal_ptr = signal_ptr;
936        self.slot_ptr.store(slot_ptr, Ordering::Release);
937        let slot = unsafe { &*slot_ptr };
938        slot.set_task_ptr(self as *mut Task);
939        self.state.store(TASK_IDLE, Ordering::Relaxed);
940        self.yielded.store(false, Ordering::Relaxed);
941        self.cpu_time_enabled.store(false, Ordering::Relaxed);
942        self.future_ptr.store(ptr::null_mut(), Ordering::Relaxed);
943        self.pinned_generator_ptr.store(ptr::null_mut(), Ordering::Relaxed);
944        self.generator_run_mode.store(GeneratorRunMode::None as u8, Ordering::Relaxed);
945        unsafe {
946            let stats = &mut *self.stats.get();
947            stats.reset();
948        }
949    }
950
951    /// Check if this task has a pinned generator
952    #[inline(always)]
953    pub fn has_pinned_generator(&self) -> bool {
954        !self.pinned_generator_ptr.load(Ordering::Acquire).is_null()
955    }
956
957    /// Get the pinned generator for this task (if any)
958    /// Safety: Only call from the worker thread that owns this task
959    #[inline(always)]
960    pub unsafe fn get_pinned_generator<'a>(&self) -> Option<&'a mut Box<dyn Iterator<Item = usize> + 'static>> {
961        let ptr = self.pinned_generator_ptr.load(Ordering::Acquire);
962        if ptr.is_null() {
963            None
964        } else {
965            unsafe { Some(&mut *(ptr as *mut Box<dyn Iterator<Item = usize> + 'static>)) }
966        }
967    }
968
969    /// Pin a generator to this task
970    /// Safety: Only call from the worker thread that owns this task
971    #[inline(always)]
972    pub unsafe fn pin_generator(&self, generator_box: Box<dyn Iterator<Item = usize> + 'static>, mode: GeneratorRunMode) {
973        let ptr = Box::into_raw(Box::new(generator_box)) as *mut ();
974        self.pinned_generator_ptr.store(ptr, Ordering::Release);
975        self.generator_run_mode.store(mode as u8, Ordering::Release);
976    }
977    
978    /// Get the generator run mode
979    #[inline(always)]
980    pub fn generator_run_mode(&self) -> GeneratorRunMode {
981        let mode = self.generator_run_mode.load(Ordering::Acquire);
982        match mode {
983            1 => GeneratorRunMode::Switch,
984            2 => GeneratorRunMode::Poll,
985            _ => GeneratorRunMode::None,
986        }
987    }
988    
989    /// Set the generator run mode
990    /// Safety: Only call from the worker thread that owns this task
991    #[inline(always)]
992    pub unsafe fn set_generator_run_mode(&self, mode: GeneratorRunMode) {
993        self.generator_run_mode.store(mode as u8, Ordering::Release);
994    }
995
996    /// Take ownership of the pinned generator (for cleanup)
997    /// Safety: Only call from the worker thread that owns this task
998    #[inline(always)]
999    pub unsafe fn take_pinned_generator(&self) -> Option<Box<Box<dyn Iterator<Item = usize> + 'static>>> {
1000        let ptr = self.pinned_generator_ptr.swap(ptr::null_mut(), Ordering::AcqRel);
1001        if ptr.is_null() {
1002            None
1003        } else {
1004            unsafe { Some(Box::from_raw(ptr as *mut Box<dyn Iterator<Item = usize> + 'static>)) }
1005        }
1006    }
1007}
1008
1009pub struct TaskArena {
1010    memory: NonNull<u8>,
1011    size: usize,
1012    config: TaskArenaConfig,
1013    layout: ArenaLayout,
1014    task_signals: Box<[TaskSignal]>, // Heap-allocated task signals
1015    total_tasks: AtomicU64,
1016    is_closed: AtomicBool,
1017}
1018
1019/// Errors that can occur when spawning a new task into the arena.
1020#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1021pub enum SpawnError {
1022    /// The arena is closed and no longer accepts new tasks.
1023    Closed,
1024    /// All task slots are currently in use.
1025    NoCapacity,
1026    /// The reserved task slot already had an attached future.
1027    AttachFailed,
1028}
1029
1030impl fmt::Display for SpawnError {
1031    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1032        match self {
1033            SpawnError::Closed => write!(f, "executor arena is closed"),
1034            SpawnError::NoCapacity => write!(f, "no task slots available"),
1035            SpawnError::AttachFailed => write!(f, "task slot already has a future attached"),
1036        }
1037    }
1038}
1039
1040impl std::error::Error for SpawnError {}
1041
1042unsafe impl Send for TaskArena {}
1043unsafe impl Sync for TaskArena {}
1044
1045impl TaskArena {
1046    pub fn with_config(config: TaskArenaConfig, options: TaskArenaOptions) -> io::Result<Self> {
1047        let layout = ArenaLayout::new(&config);
1048        let memory_ptr = Self::allocate_memory(layout.total_size, &options)?;
1049        let memory = NonNull::new(memory_ptr)
1050            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "allocation returned null"))?;
1051
1052        if options.preinitialize_tasks {
1053            unsafe {
1054                ptr::write_bytes(memory.as_ptr(), 0, layout.total_size);
1055            }
1056        }
1057
1058        // Allocate task signals on the heap
1059        let signal_count = config.leaf_count * layout.signals_per_leaf;
1060        let task_signals = (0..signal_count)
1061            .map(|_| TaskSignal::new())
1062            .collect::<Vec<_>>()
1063            .into_boxed_slice();
1064
1065        let arena = TaskArena {
1066            memory,
1067            size: layout.total_size,
1068            config,
1069            layout,
1070            task_signals,
1071            total_tasks: AtomicU64::new(0),
1072            is_closed: AtomicBool::new(false),
1073        };
1074
1075        arena.initialize_task_slots();
1076        if options.preinitialize_tasks {
1077            arena.initialize_tasks();
1078        }
1079        Ok(arena)
1080    }
1081
1082    pub fn new(leaf_count: usize, tasks_per_leaf: usize) -> io::Result<Self> {
1083        Self::with_config(
1084            TaskArenaConfig::new(leaf_count, tasks_per_leaf)?,
1085            TaskArenaOptions::default(),
1086        )
1087    }
1088
1089    #[inline]
1090    pub fn is_closed(&self) -> bool {
1091        self.is_closed.load(Ordering::Acquire)
1092    }
1093
1094    #[inline]
1095    pub fn close(&self) {
1096        self.is_closed.store(true, Ordering::Release);
1097    }
1098
1099    #[inline]
1100    pub fn config(&self) -> &TaskArenaConfig {
1101        &self.config
1102    }
1103
1104    #[inline]
1105    pub fn layout(&self) -> &ArenaLayout {
1106        &self.layout
1107    }
1108
1109    #[inline]
1110    pub fn increment_total_tasks(&self) {
1111        self.total_tasks.fetch_add(1, Ordering::Relaxed);
1112    }
1113
1114    #[inline]
1115    pub fn decrement_total_tasks(&self) {
1116        self.total_tasks.fetch_sub(1, Ordering::Relaxed);
1117    }
1118
1119    fn initialize_task_slots(&self) {
1120        let total = self.config.leaf_count * self.config.tasks_per_leaf;
1121        let slots_ptr = self.task_slots_ptr();
1122
1123        unsafe {
1124            for idx in 0..total {
1125                let slot_ptr = slots_ptr.add(idx);
1126                ptr::write(slot_ptr, TaskSlot::new(ptr::null_mut()));
1127            }
1128        }
1129    }
1130
1131    fn initialize_tasks(&self) {
1132        let tasks_per_leaf = self.config.tasks_per_leaf;
1133        let tasks_ptr = self.tasks_ptr();
1134
1135        unsafe {
1136            for leaf in 0..self.config.leaf_count {
1137                for slot in 0..tasks_per_leaf {
1138                    let idx = leaf * tasks_per_leaf + slot;
1139                    let signal_idx = slot / 64;
1140                    let signal_bit = (slot % 64) as u8;
1141                    let signal_ptr = self.task_signal_ptr(leaf, signal_idx);
1142                    let global_id = (leaf * tasks_per_leaf + slot) as u32;
1143                    let task_ptr = tasks_ptr.add(idx);
1144                    let slot_ptr = self.task_slot_ptr(leaf, slot);
1145                    Task::construct(
1146                        task_ptr,
1147                        global_id,
1148                        leaf as u16,
1149                        signal_idx as u8,
1150                        signal_bit,
1151                        signal_ptr,
1152                        slot_ptr,
1153                    );
1154                    // Note: summary_tree_ptr is bound later when task is actually used
1155                }
1156            }
1157        }
1158    }
1159
1160    #[inline]
1161    fn tasks_ptr(&self) -> *mut Task {
1162        unsafe { self.memory.as_ptr().add(self.layout.task_offset) as *mut Task }
1163    }
1164
1165    #[inline]
1166    fn task_slots_ptr(&self) -> *mut TaskSlot {
1167        unsafe { self.memory.as_ptr().add(self.layout.task_slot_offset) as *mut TaskSlot }
1168    }
1169
1170    #[inline]
1171    fn task_slot_ptr(&self, leaf_idx: usize, slot_idx: usize) -> *mut TaskSlot {
1172        debug_assert!(leaf_idx < self.config.leaf_count);
1173        debug_assert!(slot_idx < self.config.tasks_per_leaf);
1174        unsafe {
1175            self.task_slots_ptr()
1176                .add(leaf_idx * self.config.tasks_per_leaf + slot_idx)
1177        }
1178    }
1179
1180    #[inline]
1181    pub fn task_signal_ptr(&self, leaf_idx: usize, signal_idx: usize) -> *const TaskSignal {
1182        let index = leaf_idx * self.layout.signals_per_leaf + signal_idx;
1183        &self.task_signals[index] as *const TaskSignal
1184    }
1185
1186    #[inline]
1187    pub fn active_signals(&self, leaf_idx: usize) -> *const TaskSignal {
1188        debug_assert!(leaf_idx < self.config.leaf_count);
1189        let index = leaf_idx * self.layout.signals_per_leaf;
1190        &self.task_signals[index] as *const TaskSignal
1191    }
1192
1193    #[inline]
1194    pub fn leaf_count(&self) -> usize {
1195        self.config.leaf_count
1196    }
1197
1198    #[inline]
1199    pub fn signals_per_leaf(&self) -> usize {
1200        self.layout.signals_per_leaf
1201    }
1202
1203    #[inline]
1204    pub fn tasks_per_leaf(&self) -> usize {
1205        self.config.tasks_per_leaf
1206    }
1207
1208    #[inline]
1209    pub fn compose_id(&self, leaf_idx: usize, slot_idx: usize) -> u32 {
1210        (leaf_idx * self.config.tasks_per_leaf + slot_idx) as u32
1211    }
1212
1213    #[inline]
1214    pub fn decompose_id(&self, global_id: u32) -> (usize, usize) {
1215        let tasks_per_leaf = self.config.tasks_per_leaf;
1216        let leaf_idx = (global_id as usize) / tasks_per_leaf;
1217        let slot_idx = (global_id as usize) % tasks_per_leaf;
1218        (leaf_idx, slot_idx)
1219    }
1220
1221    #[inline]
1222    pub unsafe fn task(&self, leaf_idx: usize, slot_idx: usize) -> &Task {
1223        debug_assert!(leaf_idx < self.config.leaf_count);
1224        debug_assert!(slot_idx < self.config.tasks_per_leaf);
1225        let signal_idx = slot_idx / 64;
1226        let bit_idx = (slot_idx % 64) as u8;
1227        let task_ptr = self
1228            .ensure_task_initialized(leaf_idx, signal_idx, bit_idx)
1229            .expect("task slot not initialized");
1230        unsafe { &*task_ptr.as_ptr() }
1231    }
1232
1233    fn ensure_task_initialized(
1234        &self,
1235        leaf_idx: usize,
1236        signal_idx: usize,
1237        bit_idx: u8,
1238    ) -> Option<NonNull<Task>> {
1239        let slot_idx = signal_idx * 64 + bit_idx as usize;
1240        if slot_idx >= self.config.tasks_per_leaf {
1241            return None;
1242        }
1243
1244        let slot_ptr = self.task_slot_ptr(leaf_idx, slot_idx);
1245        let slot = unsafe { &*slot_ptr };
1246
1247        // Fast path: task already initialized
1248        let existing = slot.task_ptr();
1249        if !existing.is_null() {
1250            return NonNull::new(existing);
1251        }
1252
1253        debug_assert!(signal_idx < self.layout.signals_per_leaf);
1254        debug_assert!(bit_idx < 64);
1255
1256        let idx = leaf_idx * self.config.tasks_per_leaf + slot_idx;
1257        let task_ptr = unsafe { self.tasks_ptr().add(idx) };
1258        let signal_ptr = self.task_signal_ptr(leaf_idx, signal_idx);
1259        let global_id = self.compose_id(leaf_idx, slot_idx);
1260
1261        // Atomically claim the right to initialize this task
1262        // Use a sentinel value during initialization to prevent concurrent initialization
1263        let sentinel = 0x1 as *mut Task; // Non-null but invalid pointer
1264        match slot.task_ptr_compare_exchange(ptr::null_mut(), sentinel) {
1265            Ok(_) => {
1266                // We won the race - initialize the task
1267                unsafe {
1268                    Task::construct(
1269                        task_ptr,
1270                        global_id,
1271                        leaf_idx as u16,
1272                        signal_idx as u8,
1273                        bit_idx,
1274                        signal_ptr,
1275                        slot_ptr,
1276                    );
1277                    // Note: summary_tree_ptr is bound later via init_task
1278
1279                    // Publish the initialized task pointer
1280                    slot.set_task_ptr(task_ptr);
1281                }
1282                NonNull::new(task_ptr)
1283            }
1284            Err(actual) => {
1285                // Another thread is initializing or already initialized
1286                // Wait for initialization to complete
1287                loop {
1288                    let ptr = slot.task_ptr();
1289                    if ptr != sentinel && !ptr.is_null() {
1290                        return NonNull::new(ptr);
1291                    }
1292                    std::hint::spin_loop();
1293                }
1294            }
1295        }
1296    }
1297
1298    #[inline]
1299    pub fn handle_for_location(
1300        &self,
1301        leaf_idx: usize,
1302        signal_idx: usize,
1303        bit_idx: u8,
1304    ) -> Option<TaskHandle> {
1305        self.ensure_task_initialized(leaf_idx, signal_idx, bit_idx)
1306            .map(TaskHandle::from_non_null)
1307    }
1308
1309    pub fn init_task(&self, global_id: u32, summary_tree: *const Summary) {
1310        let (leaf_idx, slot_idx) = self.decompose_id(global_id);
1311        let signal_idx = slot_idx / 64;
1312        let signal_bit = (slot_idx % 64) as u8;
1313
1314        debug_assert!(signal_idx < self.layout.signals_per_leaf);
1315
1316        let task_ptr = self
1317            .ensure_task_initialized(leaf_idx, signal_idx, signal_bit)
1318            .expect("failed to initialize task slot");
1319
1320        unsafe {
1321            let task = &mut *task_ptr.as_ptr();
1322            let slot_ptr = self.task_slot_ptr(leaf_idx, slot_idx);
1323            let signal_ptr = self.task_signal_ptr(leaf_idx, signal_idx);
1324            task.reset(
1325                global_id,
1326                leaf_idx as u16,
1327                signal_idx as u8,
1328                signal_bit,
1329                signal_ptr,
1330                slot_ptr,
1331            );
1332            if task.summary_tree_ptr.is_null() {
1333                task.bind_summary_tree(summary_tree);
1334            }
1335        }
1336    }
1337
1338    // Note: Task management methods (reserve_task, release_task, activate_task, deactivate_task)
1339    // have been moved to WorkerService since they require access to SummaryTree
1340    // which is now owned by WorkerService
1341
1342    #[allow(dead_code)]
1343    pub fn schedule_task_timer(
1344        &self,
1345        task: TaskHandle,
1346        timer: &TimerHandle,
1347        worker_id: u32,
1348        deadline_ns: u64,
1349    ) {
1350        let _ = (task, timer, worker_id, deadline_ns);
1351        // Timer scheduling through TaskHandle is not supported under the identity-only timer model.
1352    }
1353
1354    #[inline(always)]
1355    pub(crate) fn task_handle_from_payload(ptr: *mut ()) -> Option<TaskHandle> {
1356        NonNull::new(ptr as *mut Task).map(TaskHandle::from_non_null)
1357    }
1358
1359    pub fn stats(&self) -> TaskArenaStats {
1360        TaskArenaStats {
1361            total_capacity: self.config.leaf_count * self.config.tasks_per_leaf,
1362            active_tasks: self.total_tasks.load(Ordering::Relaxed) as usize,
1363            worker_count: 0, // Worker count now managed by WorkerService
1364        }
1365    }
1366
1367    fn allocate_memory(size: usize, options: &TaskArenaOptions) -> io::Result<*mut u8> {
1368        #[cfg(unix)]
1369        {
1370            let mut flags = MAP_PRIVATE | MAP_ANONYMOUS;
1371            #[cfg(target_os = "linux")]
1372            if options.use_huge_pages {
1373                flags |= MAP_HUGETLB | MAP_HUGE_2MB;
1374            }
1375            let ptr = unsafe { mmap(ptr::null_mut(), size, PROT_READ | PROT_WRITE, flags, -1, 0) };
1376            if ptr == MAP_FAILED {
1377                Err(io::Error::last_os_error())
1378            } else {
1379                Ok(ptr as *mut u8)
1380            }
1381        }
1382
1383        #[cfg(windows)]
1384        {
1385            let mut flags = MEM_RESERVE | MEM_COMMIT;
1386            if options.use_huge_pages {
1387                flags |= MEM_LARGE_PAGES;
1388            }
1389            let ptr = unsafe { VirtualAlloc(ptr::null_mut(), size, flags, PAGE_READWRITE) };
1390            if ptr.is_null() {
1391                Err(io::Error::last_os_error())
1392            } else {
1393                Ok(ptr as *mut u8)
1394            }
1395        }
1396    }
1397}
1398
1399impl Drop for TaskArena {
1400    fn drop(&mut self) {
1401        unsafe {
1402            #[cfg(unix)]
1403            {
1404                munmap(self.memory.as_ptr() as *mut _, self.size);
1405            }
1406
1407            #[cfg(windows)]
1408            {
1409                VirtualFree(self.memory.as_ptr() as *mut _, 0, MEM_RELEASE);
1410            }
1411        }
1412    }
1413}
1414
1415#[derive(Clone, Copy, Debug)]
1416pub struct TaskArenaStats {
1417    pub total_capacity: usize,
1418    pub active_tasks: usize,
1419    pub worker_count: usize,
1420}
1421
1422// Tests for Task, TaskSignal, TaskArena, and related functionality
1423#[cfg(test)]
1424mod tests {
1425    use super::*;
1426    use crate::runtime::worker::Worker;
1427    use std::future::poll_fn;
1428    use std::mem::{self, MaybeUninit};
1429    use std::ptr;
1430    use std::sync::Arc;
1431    use std::sync::atomic::{AtomicUsize, Ordering};
1432    use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1433
1434    unsafe fn noop_clone(_: *const ()) -> RawWaker {
1435        RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
1436    }
1437
1438    unsafe fn noop(_: *const ()) {}
1439
1440    static NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
1441
1442    fn noop_waker() -> Waker {
1443        unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }
1444    }
1445
1446    fn setup_arena(leaf_count: usize, tasks_per_leaf: usize) -> Arc<TaskArena> {
1447        let config = TaskArenaConfig::new(leaf_count, tasks_per_leaf).unwrap();
1448        Arc::new(TaskArena::with_config(config, TaskArenaOptions::default()).unwrap())
1449    }
1450
1451    #[test]
1452    fn task_signal_basic_operations() {
1453        let signal = TaskSignal::new();
1454        let (was_empty, was_set) = signal.set(5);
1455        assert!(was_empty);
1456        assert!(was_set);
1457        assert!(signal.is_set(5));
1458
1459        let (remaining, acquired) = signal.try_acquire(5);
1460        assert!(acquired);
1461        assert_eq!(remaining & (1 << 5), 0);
1462        assert!(!signal.is_set(5));
1463
1464        let (remaining, now_empty) = signal.clear(5);
1465        assert_eq!(remaining, 0);
1466        assert!(now_empty);
1467    }
1468
1469    #[test]
1470    fn task_signal_set_idempotent() {
1471        let signal = TaskSignal::new();
1472        assert_eq!(signal.set(3), (true, true));
1473        assert_eq!(signal.set(3), (false, false));
1474        assert!(signal.is_set(3));
1475    }
1476
1477    #[test]
1478    fn task_signal_clear_noop_when_absent() {
1479        let signal = TaskSignal::new();
1480        let (remaining, now_empty) = signal.clear(7);
1481        assert_eq!(remaining, 0);
1482        assert!(now_empty);
1483    }
1484
1485    #[test]
1486    fn task_signal_try_acquire_unset_bit() {
1487        let signal = TaskSignal::new();
1488        let (remaining, acquired) = signal.try_acquire(12);
1489        assert_eq!(remaining, 0);
1490        assert!(!acquired);
1491    }
1492
1493    #[test]
1494    fn task_signal_try_acquire_from_wraps() {
1495        let signal = TaskSignal::new();
1496        signal.set(2);
1497        signal.set(60);
1498
1499        let (bit, _) = signal
1500            .try_acquire_from(59)
1501            .expect("expected to acquire bit after wrap");
1502        assert_eq!(bit, 60);
1503
1504        let (bit, _) = signal
1505            .try_acquire_from(61)
1506            .expect("expected to wrap to remaining bit");
1507        assert_eq!(bit, 2);
1508        assert!(signal.try_acquire_from(0).is_none());
1509    }
1510
1511    #[test]
1512    fn task_signal_try_acquire_from_until_empty() {
1513        let signal = TaskSignal::new();
1514        signal.set(0);
1515        signal.set(1);
1516
1517        assert!(signal.try_acquire_from(0).is_some());
1518        assert!(signal.try_acquire_from(0).is_some());
1519        assert!(signal.try_acquire_from(0).is_none());
1520        assert_eq!(signal.load(Ordering::Relaxed), 0);
1521    }
1522
1523    #[test]
1524    fn task_signal_try_acquire_from_selects_nearest() {
1525        let signal = TaskSignal::new();
1526        for bit in [2u64, 6, 10] {
1527            signal.set(bit);
1528        }
1529        let (bit, remaining) = signal
1530            .try_acquire_from(5)
1531            .expect("expected to acquire a bit");
1532        assert_eq!(bit, 6);
1533        assert_ne!(remaining & (1 << 2), 0);
1534    }
1535
1536    #[test]
1537    #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1538    #[cfg(feature = "disabled_tests")]
1539    fn schedule_begin_finish_flow_clears_summary() {
1540        let arena = setup_arena(1, 64);
1541        let handle = arena.reserve_task().expect("reserve task");
1542        let leaf = handle.leaf_idx();
1543        let signal_idx = handle.signal_idx();
1544        let bit_idx = handle.bit_idx();
1545        let global = handle.global_id(arena.tasks_per_leaf());
1546        arena.init_task(global);
1547
1548        let slot_idx = signal_idx * 64 + bit_idx as usize;
1549        let task = unsafe { arena.task(leaf, slot_idx) };
1550        let signal = unsafe { &*task.signal_ptr };
1551
1552        assert_eq!(signal.load(Ordering::Relaxed), 0);
1553        task.schedule();
1554        assert!(signal.is_set(task.signal_bit));
1555        assert_ne!(
1556            arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1557            0
1558        );
1559
1560        let (remaining, acquired) = signal.try_acquire(bit_idx);
1561        assert!(acquired);
1562        if remaining == 0 {
1563            arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1564        }
1565        task.begin();
1566        task.finish();
1567        assert_eq!(
1568            arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1569            0
1570        );
1571
1572        arena.release_task(handle);
1573    }
1574
1575    #[test]
1576    #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1577    #[cfg(feature = "disabled_tests")]
1578    fn finish_reschedules_when_work_arrives_during_execution() {
1579        let arena = setup_arena(1, 64);
1580        let handle = arena.reserve_task().expect("reserve task");
1581        let leaf = handle.leaf_idx();
1582        let signal_idx = handle.signal_idx();
1583        let bit_idx = handle.bit_idx();
1584        let global = handle.global_id(arena.tasks_per_leaf());
1585        arena.init_task(global);
1586        let slot_idx = signal_idx * 64 + bit_idx as usize;
1587        let task = unsafe { arena.task(leaf, slot_idx) };
1588        let signal = unsafe { &*task.signal_ptr };
1589
1590        task.schedule();
1591        let (remaining, acquired) = signal.try_acquire(bit_idx);
1592        assert!(acquired);
1593        if remaining == 0 {
1594            arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1595        }
1596        task.begin();
1597        // concurrent producer schedules additional work
1598        task.schedule();
1599        task.finish();
1600        assert_ne!(
1601            arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1602            0,
1603            "queue should remain visible after concurrent schedule"
1604        );
1605
1606        arena.deactivate_task(handle);
1607        arena.release_task(handle);
1608    }
1609
1610    #[test]
1611    fn task_handle_reports_task_fields() {
1612        let arena = setup_arena(4, 128);
1613        let leaf = 2;
1614        let slot = 113;
1615        let signal = slot / 64;
1616        let bit = (slot % 64) as u8;
1617        let task = unsafe { arena.task(leaf, slot) };
1618        let handle = TaskHandle::from_task(task);
1619        assert_eq!(handle.leaf_idx(), leaf);
1620        assert_eq!(handle.signal_idx(), signal);
1621        assert_eq!(handle.bit_idx(), bit);
1622    }
1623
1624    #[test]
1625    fn task_handle_global_id_matches_components() {
1626        let arena = setup_arena(2, 128);
1627        let leaf = 1;
1628        let slot = 70;
1629        let task = unsafe { arena.task(leaf, slot) };
1630        let handle = TaskHandle::from_task(task);
1631        assert_eq!(handle.global_id(arena.tasks_per_leaf()), task.global_id());
1632    }
1633
1634    #[test]
1635    fn future_helpers_drop_boxed_accepts_null() {
1636        unsafe {
1637            FutureAllocator::drop_boxed(ptr::null_mut());
1638        }
1639    }
1640
1641    #[test]
1642    fn future_helpers_poll_boxed_accepts_null() {
1643        let waker = noop_waker();
1644        let mut cx = Context::from_waker(&waker);
1645        assert!(unsafe { FutureAllocator::poll_boxed(ptr::null_mut(), &mut cx) }.is_none());
1646    }
1647
1648    #[test]
1649    #[cfg(feature = "disabled_tests")] // References old Task fields (slot_idx)
1650    fn task_construct_initializes_fields() {
1651        let mut storage = MaybeUninit::<Task>::uninit();
1652        let mut slot_storage = MaybeUninit::<TaskSlot>::uninit();
1653        let signal = TaskSignal::new();
1654        unsafe {
1655            slot_storage
1656                .as_mut_ptr()
1657                .write(TaskSlot::new(storage.as_mut_ptr()));
1658            Task::construct(
1659                storage.as_mut_ptr(),
1660                42,
1661                1,
1662                2,
1663                3,
1664                4,
1665                &signal as *const _,
1666                slot_storage.as_mut_ptr(),
1667            );
1668            let task = &*storage.as_ptr();
1669            assert_eq!(task.global_id, 42);
1670            assert_eq!(task.leaf_idx, 1);
1671            assert_eq!(task.signal_idx, 2);
1672            assert_eq!(task.slot_idx, 3);
1673            assert_eq!(task.signal_bit, 4);
1674            assert_eq!(task.state.load(Ordering::Relaxed), TASK_IDLE);
1675            assert!(!task.yielded.load(Ordering::Relaxed));
1676            assert!(task.future_ptr.load(Ordering::Relaxed).is_null());
1677            ptr::drop_in_place(storage.as_mut_ptr());
1678            ptr::drop_in_place(slot_storage.as_mut_ptr());
1679        }
1680    }
1681
1682    #[test]
1683    #[cfg(feature = "disabled_tests")] // References old Task fields (arena_ptr, bind_arena)
1684    fn task_bind_arena_sets_pointer() {
1685        let mut storage = MaybeUninit::<Task>::uninit();
1686        let mut slot_storage = MaybeUninit::<TaskSlot>::uninit();
1687        let signal = TaskSignal::new();
1688        let arena = setup_arena(1, 64);
1689        unsafe {
1690            slot_storage
1691                .as_mut_ptr()
1692                .write(TaskSlot::new(storage.as_mut_ptr()));
1693            Task::construct(
1694                storage.as_mut_ptr(),
1695                5,
1696                0,
1697                0,
1698                0,
1699                0,
1700                &signal as *const _,
1701                slot_storage.as_mut_ptr(),
1702            );
1703            let task = &*storage.as_ptr();
1704            assert!(task.arena_ptr.load(Ordering::Relaxed).is_null());
1705            task.bind_arena(Arc::as_ptr(&arena));
1706            assert_eq!(
1707                task.arena_ptr.load(Ordering::Relaxed),
1708                Arc::as_ptr(&arena) as *mut ExecutorArena
1709            );
1710            ptr::drop_in_place(storage.as_mut_ptr());
1711            ptr::drop_in_place(slot_storage.as_mut_ptr());
1712        }
1713    }
1714
1715    // The following tests require WorkerService helper methods (reserve_task, release_task, active_tree)
1716    // that have been moved from TaskArena to WorkerService. They are disabled until we create
1717    // a test helper that sets up WorkerService properly.
1718    #[cfg(feature = "disabled_tests")]
1719    mod needs_worker_service {
1720        use super::*;
1721
1722        #[test]
1723        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1724        fn task_schedule_is_idempotent() {
1725            let arena = setup_arena(1, 64);
1726            let handle = arena.reserve_task().expect("reserve task");
1727            let global = handle.global_id(arena.tasks_per_leaf());
1728            arena.init_task(global);
1729            let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1730            let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1731            let signal = unsafe { &*task.signal_ptr };
1732
1733            task.schedule();
1734            let summary_after_first = arena
1735                .active_summary(handle.leaf_idx())
1736                .load(Ordering::Relaxed);
1737            assert!(summary_after_first & (1 << handle.signal_idx()) != 0);
1738
1739            task.schedule();
1740            let summary_after_second = arena
1741                .active_summary(handle.leaf_idx())
1742                .load(Ordering::Relaxed);
1743            assert_eq!(summary_after_first, summary_after_second);
1744            assert_eq!(signal.load(Ordering::Relaxed), 1 << task.signal_bit);
1745
1746            arena.deactivate_task(handle);
1747            arena.release_task(handle);
1748        }
1749
1750        #[test]
1751        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1752        fn task_begin_overwrites_state() {
1753            let arena = setup_arena(1, 64);
1754            let handle = arena.reserve_task().expect("reserve task");
1755            let global = handle.global_id(arena.tasks_per_leaf());
1756            arena.init_task(global);
1757            let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1758            let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1759
1760            task.schedule();
1761            task.begin();
1762            assert_eq!(task.state.load(Ordering::Relaxed), TASK_EXECUTING);
1763
1764            arena.deactivate_task(handle);
1765            arena.release_task(handle);
1766        }
1767
1768        #[test]
1769        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1770        fn task_finish_without_new_schedule_clears_signal() {
1771            let arena = setup_arena(1, 64);
1772            let handle = arena.reserve_task().expect("reserve task");
1773            let leaf = handle.leaf_idx();
1774            let signal_idx = handle.signal_idx();
1775            let bit_idx = handle.bit_idx();
1776            let global = handle.global_id(arena.tasks_per_leaf());
1777            arena.init_task(global);
1778            let slot_idx = signal_idx * 64 + bit_idx as usize;
1779            let task = unsafe { arena.task(leaf, slot_idx) };
1780            let signal = unsafe { &*task.signal_ptr };
1781
1782            task.schedule();
1783            let (remaining, acquired) = signal.try_acquire(bit_idx);
1784            assert!(acquired);
1785            if remaining == 0 {
1786                arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1787            }
1788            task.begin();
1789            task.finish();
1790
1791            assert_eq!(task.state.load(Ordering::Relaxed), TASK_IDLE);
1792            assert_eq!(signal.load(Ordering::Relaxed), 0);
1793            assert_eq!(
1794                arena.active_summary(leaf).load(Ordering::Relaxed) & (1 << signal_idx),
1795                0
1796            );
1797
1798            arena.release_task(handle);
1799        }
1800
1801        #[test]
1802        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1803        fn task_finish_and_schedule_sets_signal() {
1804            let arena = setup_arena(1, 64);
1805            let handle = arena.reserve_task().expect("reserve task");
1806            let leaf = handle.leaf_idx();
1807            let signal_idx = handle.signal_idx();
1808            let bit_idx = handle.bit_idx();
1809            let global = handle.global_id(arena.tasks_per_leaf());
1810            arena.init_task(global);
1811            let slot_idx = signal_idx * 64 + bit_idx as usize;
1812            let task = unsafe { arena.task(leaf, slot_idx) };
1813            let signal = unsafe { &*task.signal_ptr };
1814
1815            task.finish_and_schedule();
1816            assert_eq!(task.state.load(Ordering::Relaxed), TASK_SCHEDULED);
1817            assert!(signal.is_set(task.signal_bit));
1818            assert_ne!(
1819                arena.active_summary(leaf).load(Ordering::Relaxed) & (1 << signal_idx),
1820                0
1821            );
1822
1823            arena.deactivate_task(handle);
1824            arena.release_task(handle);
1825        }
1826
1827        #[test]
1828        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1829        fn task_clear_yielded_and_is_yielded() {
1830            let arena = setup_arena(1, 64);
1831            let handle = arena.reserve_task().expect("reserve task");
1832            let global = handle.global_id(arena.tasks_per_leaf());
1833            arena.init_task(global);
1834            let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1835            let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1836
1837            task.yielded.store(true, Ordering::Relaxed);
1838            assert!(task.is_yielded());
1839            task.clear_yielded();
1840            assert!(!task.is_yielded());
1841
1842            arena.release_task(handle);
1843        }
1844
1845        #[test]
1846        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1847        fn task_attach_future_rejects_second_future() {
1848            let arena = setup_arena(1, 64);
1849            let handle = arena.reserve_task().expect("reserve task");
1850            let global = handle.global_id(arena.tasks_per_leaf());
1851            arena.init_task(global);
1852            let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1853            let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1854
1855            let first_ptr = FutureAllocator::box_future(async {});
1856            task.attach_future(first_ptr).unwrap();
1857            let second_ptr = FutureAllocator::box_future(async {});
1858            let existing = task.attach_future(second_ptr).unwrap_err();
1859            assert_eq!(existing, first_ptr);
1860            unsafe { FutureAllocator::drop_boxed(second_ptr) };
1861
1862            let ptr = task.take_future().unwrap();
1863            unsafe { FutureAllocator::drop_boxed(ptr) };
1864            arena.release_task(handle);
1865        }
1866
1867        #[test]
1868        #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1869        fn task_take_future_clears_pointer() {
1870            let arena = setup_arena(1, 64);
1871            let handle = arena.reserve_task().expect("reserve task");
1872            let global = handle.global_id(arena.tasks_per_leaf());
1873            arena.init_task(global);
1874            let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1875            let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1876
1877            let future_ptr = FutureAllocator::box_future(async {});
1878            task.attach_future(future_ptr).unwrap();
1879            let returned = task.take_future().unwrap();
1880            assert_eq!(returned, future_ptr);
1881            assert!(task.take_future().is_none());
1882            unsafe { FutureAllocator::drop_boxed(returned) };
1883            arena.release_task(handle);
1884        }
1885
1886        // TODO: These tests need to be updated to use WorkerService instead of direct Worker construction
1887        // #[test]
1888        // fn arena_spawn_executes_future() {
1889        //     let arena = setup_arena(1, 8);
1890        //     let counter = Arc::new(AtomicUsize::new(0));
1891        //     let counter_clone = counter.clone();
1892        //
1893        //     let handle = arena
1894        //         .spawn(async move {
1895        //             counter_clone.fetch_add(1, Ordering::Relaxed);
1896        //         })
1897        //         .expect("spawn task");
1898        //
1899        //     // Worker construction now requires WorkerService
1900        //     // let service = WorkerService::start(arena.clone(), WorkerServiceConfig::default());
1901        //     // service.spawn_worker()?;
1902        //
1903        //     assert_eq!(counter.load(Ordering::Relaxed), 1);
1904        //     arena.release_task(handle);
1905        // }
1906        //
1907        // #[test]
1908        // fn arena_spawn_returns_no_capacity_error() {
1909        //     let arena = setup_arena(1, 1);
1910        //
1911        //     let handle = arena.spawn(async {}).expect("first spawn succeeds");
1912        //     let err = arena.spawn(async {}).expect_err("second spawn should fail");
1913        //     assert_eq!(err, SpawnError::NoCapacity);
1914        //
1915        //     arena.release_task(handle);
1916        // }
1917
1918        #[test]
1919        #[ignore = "Needs WorkerService helper - uses active_tree()"]
1920        fn reserve_task_in_leaf_exhaustion() {
1921            let arena = setup_arena(1, 64);
1922            let mut bits = Vec::with_capacity(64);
1923            for _ in 0..64 {
1924                let bit = arena
1925                    .active_tree()
1926                    .reserve_task_in_leaf(0, 0)
1927                    .expect("expected available bit");
1928                bits.push(bit);
1929            }
1930            assert!(arena.active_tree().reserve_task_in_leaf(0, 0).is_none());
1931            for bit in &bits {
1932                arena.active_tree().release_task_in_leaf(0, 0, *bit);
1933            }
1934        }
1935
1936        #[test]
1937        #[ignore = "Needs WorkerService helper - uses active_tree()"]
1938        fn reserve_task_in_leaf_after_release() {
1939            let arena = setup_arena(1, 64);
1940            let bit = arena
1941                .active_tree()
1942                .reserve_task_in_leaf(0, 0)
1943                .expect("expected bit");
1944            arena.active_tree().release_task_in_leaf(0, 0, bit);
1945            let new_bit = arena
1946                .active_tree()
1947                .reserve_task_in_leaf(0, 0)
1948                .expect("bit after release");
1949            arena.active_tree().release_task_in_leaf(0, 0, new_bit);
1950        }
1951    } // mod needs_worker_service
1952}