1use super::summary::Summary;
2use super::timer::TimerHandle;
3use super::worker::Worker;
4use crate::utils::bits;
5use std::cell::UnsafeCell;
6use std::fmt;
7use std::future::{Future, IntoFuture};
8use std::io;
9use std::pin::Pin;
10use std::ptr::{self, NonNull};
11use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU8, AtomicU64, Ordering};
12use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
13use std::time::{Duration, Instant};
14
15#[cfg(unix)]
16use libc::{MAP_ANONYMOUS, MAP_FAILED, MAP_PRIVATE, PROT_READ, PROT_WRITE, mmap, munmap};
17
18#[cfg(target_os = "linux")]
19use libc::{MAP_HUGE_2MB, MAP_HUGETLB};
20
21#[cfg(windows)]
22use winapi::um::memoryapi::{VirtualAlloc, VirtualFree};
23#[cfg(windows)]
24use winapi::um::winnt::{MEM_COMMIT, MEM_LARGE_PAGES, MEM_RELEASE, MEM_RESERVE, PAGE_READWRITE};
25
26pub type BoxFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
27
28pub const TASK_IDLE: u8 = 0;
29pub const TASK_SCHEDULED: u8 = 1;
30pub const TASK_EXECUTING: u8 = 2;
31pub const TASK_SCHEDULED_AND_EXECUTING: u8 = 3;
32
33#[derive(Clone, Copy, Debug)]
34pub struct TaskArenaOptions {
35 pub use_huge_pages: bool,
36 pub preinitialize_tasks: bool,
37}
38
39impl Default for TaskArenaOptions {
40 fn default() -> Self {
41 Self {
42 use_huge_pages: false,
43 preinitialize_tasks: false,
44 }
45 }
46}
47
48impl TaskArenaOptions {
49 pub fn new(use_huge_pages: bool, preinitialize_tasks: bool) -> Self {
50 Self {
51 use_huge_pages,
52 preinitialize_tasks,
53 }
54 }
55
56 pub fn with_huge_pages(mut self, huge_pages: bool) -> Self {
57 self.use_huge_pages = huge_pages;
58 self
59 }
60
61 pub fn with_preinitialized_tasks(mut self, enabled: bool) -> Self {
62 self.preinitialize_tasks = enabled;
63 self
64 }
65}
66
67#[derive(Clone, Copy, Debug)]
68pub struct TaskArenaConfig {
69 pub leaf_count: usize,
70 pub tasks_per_leaf: usize,
71 pub max_workers: usize,
72}
73
74impl TaskArenaConfig {
75 pub fn new(leaf_count: usize, tasks_per_leaf: usize) -> io::Result<Self> {
76 let leaf_count = if !leaf_count.is_power_of_two() {
77 leaf_count.next_power_of_two()
78 } else {
79 leaf_count
80 };
81 if leaf_count == 0 || tasks_per_leaf == 0 {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidInput,
89 "leaf_count and tasks_per_leaf must be > 0",
90 ));
91 }
92 Ok(Self {
93 leaf_count,
94 tasks_per_leaf,
95 max_workers: leaf_count,
96 })
97 }
98
99 pub fn with_max_workers(mut self, workers: usize) -> Self {
100 self.max_workers = workers.max(1).min(self.leaf_count);
101 self
102 }
103}
104
105#[repr(C)]
106#[derive(Debug)]
107pub struct TaskSignal {
108 value: AtomicU64,
109}
110
111impl TaskSignal {
112 pub const fn new() -> Self {
113 Self {
114 value: AtomicU64::new(0),
115 }
116 }
117
118 #[inline]
119 pub fn load(&self, ordering: Ordering) -> u64 {
120 self.value.load(ordering)
121 }
122
123 #[inline(always)]
124 pub fn is_set(&self, bit_index: u64) -> bool {
125 let mask = 1u64 << bit_index;
126 (self.value.load(Ordering::Relaxed) & mask) != 0
127 }
128
129 #[inline(always)]
130 pub fn set(&self, bit_index: u64) -> (bool, bool) {
131 let mask = 1u64 << bit_index;
132 let prev = self.value.fetch_or(mask, Ordering::AcqRel);
133 (prev == 0, (prev & mask) == 0)
135 }
136
137 #[inline]
138 pub fn clear(&self, bit_index: u64) -> (u64, bool) {
139 let mask = 1u64 << bit_index;
140 let previous = self.value.fetch_and(!mask, Ordering::AcqRel);
141 let remaining = previous & !mask;
142 (remaining, remaining == 0)
143 }
144
145 #[inline]
146 pub fn try_acquire(&self, bit_index: u64) -> (u64, bool) {
147 if !self.is_set(bit_index) {
148 return (0, false);
149 }
150 let mask = 1u64 << bit_index;
151 let (_, previous, acquired) = bits::try_acquire(&self.value, bit_index as u64);
152 let remaining = previous & !mask;
153 (remaining, acquired)
154 }
155
156 #[inline]
157 pub fn try_acquire_from(&self, start_bit: u64) -> Option<(u32, u64)> {
158 let start = (start_bit as u64).min(63);
159 for _ in 0..64 {
160 let current = self.value.load(Ordering::Acquire);
161 if current == 0 {
162 return None;
163 }
164
165 let candidate = bits::find_nearest(current, start);
166 let bit_index = if candidate < 64 {
167 candidate as u32
168 } else {
169 current.trailing_zeros()
170 };
171
172 let (bit_mask, previous, acquired) = bits::try_acquire(&self.value, bit_index as u64);
173 if !acquired {
174 std::hint::spin_loop();
175 continue;
176 }
177
178 let remaining = previous & !bit_mask;
179 return Some((bit_index, remaining));
180 }
181 None
182 }
183}
184
185#[derive(Clone, Copy, Debug)]
186pub struct TaskHandle(NonNull<Task>);
187
188impl TaskHandle {
189 #[inline(always)]
190 pub fn from_task(task: &Task) -> Self {
191 TaskHandle(NonNull::from(task))
192 }
193
194 #[inline(always)]
195 pub fn from_non_null(task: NonNull<Task>) -> Self {
196 TaskHandle(task)
197 }
198
199 #[inline(always)]
200 pub fn as_ptr(&self) -> *mut Task {
201 self.0.as_ptr()
202 }
203
204 #[inline(always)]
205 pub fn as_non_null(&self) -> NonNull<Task> {
206 self.0
207 }
208
209 #[inline(always)]
210 pub fn task(&self) -> &Task {
211 unsafe { self.0.as_ref() }
212 }
213
214 #[inline(always)]
215 pub fn leaf_idx(&self) -> usize {
216 self.task().leaf_idx as usize
217 }
218
219 #[inline(always)]
220 pub fn signal_idx(&self) -> usize {
221 self.task().signal_idx as usize
222 }
223
224 #[inline(always)]
225 pub fn bit_idx(&self) -> u8 {
226 self.task().signal_bit
227 }
228
229 #[inline(always)]
230 pub fn global_id(&self, _tasks_per_leaf: usize) -> u32 {
231 self.task().global_id()
232 }
233}
234
235unsafe impl Send for TaskHandle {}
236unsafe impl Sync for TaskHandle {}
237
238#[repr(C)]
239pub struct TaskSlot {
240 task_ptr: AtomicPtr<Task>,
241 active_task_ptr: AtomicPtr<Task>,
242}
243
244impl TaskSlot {
245 #[inline(always)]
246 pub fn new(task_ptr: *mut Task) -> Self {
247 Self {
248 task_ptr: AtomicPtr::new(task_ptr),
249 active_task_ptr: AtomicPtr::new(ptr::null_mut()),
250 }
251 }
252
253 #[inline(always)]
254 pub fn task_ptr(&self) -> *mut Task {
255 self.task_ptr.load(Ordering::Acquire)
256 }
257
258 #[inline(always)]
259 pub fn set_task_ptr(&self, ptr: *mut Task) {
260 self.task_ptr.store(ptr, Ordering::Release);
261 }
262
263 #[inline(always)]
264 pub fn task_ptr_compare_exchange(
265 &self,
266 current: *mut Task,
267 new: *mut Task,
268 ) -> Result<*mut Task, *mut Task> {
269 self.task_ptr
270 .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire)
271 }
272
273 #[inline(always)]
274 pub fn clear_task_ptr(&self) {
275 self.task_ptr.store(ptr::null_mut(), Ordering::Release);
276 self.active_task_ptr
277 .store(ptr::null_mut(), Ordering::Release);
278 }
279
280 #[inline(always)]
281 pub fn set_active_task_ptr(&self, ptr: *mut Task) {
282 self.active_task_ptr.store(ptr, Ordering::Release);
283 }
284
285 #[inline(always)]
286 pub fn active_task_ptr(&self) -> *mut Task {
287 self.active_task_ptr.load(Ordering::Acquire)
288 }
289
290 #[inline(always)]
291 pub fn clear_active_task_ptr(&self) {
292 self.active_task_ptr
293 .store(ptr::null_mut(), Ordering::Release);
294 }
295}
296
297#[derive(Debug)]
298pub struct ArenaLayout {
299 task_slot_offset: usize,
300 task_offset: usize,
301 total_size: usize,
302 pub signals_per_leaf: usize,
303}
304
305impl ArenaLayout {
306 fn new(config: &TaskArenaConfig) -> Self {
307 let signals_per_leaf = (config.tasks_per_leaf + 63) / 64;
308
309 let task_slot_size =
311 config.leaf_count * config.tasks_per_leaf * std::mem::size_of::<TaskSlot>();
312 let task_size = config.leaf_count * config.tasks_per_leaf * std::mem::size_of::<Task>();
313
314 let mut offset = 0usize;
315 let task_slot_offset = offset;
316 offset += task_slot_size;
317 let task_offset = offset;
318 offset += task_size;
319
320 Self {
321 task_slot_offset,
322 task_offset,
323 total_size: offset,
324 signals_per_leaf,
325 }
326 }
327}
328
329pub(crate) struct FutureAllocator;
330
331impl FutureAllocator {
332 #[inline(always)]
333 pub fn box_future<F>(future: F) -> *mut ()
334 where
335 F: Future<Output = ()> + Send + 'static,
336 {
337 let boxed: BoxFuture = Box::pin(future);
338 Box::into_raw(Box::new(boxed)) as *mut ()
339 }
340
341 #[inline(always)]
342 pub unsafe fn drop_boxed(ptr: *mut ()) {
343 if ptr.is_null() {
344 return;
345 }
346 unsafe {
347 drop(Box::from_raw(ptr as *mut BoxFuture));
348 }
349 }
350
351 #[inline(always)]
352 pub unsafe fn poll_boxed(ptr: *mut (), cx: &mut Context<'_>) -> Option<Poll<()>> {
353 if ptr.is_null() {
354 return None;
355 }
356 unsafe {
357 let future = &mut *(ptr as *mut BoxFuture);
358 Some(future.as_mut().poll(cx))
359 }
360 }
361}
362
363#[repr(C)]
364#[derive(Debug, Default, Clone, Copy)]
365pub struct TaskStats {
366 pub polls: u32,
367 pub yields: u32,
368 pub cpu_time_ns: u64,
369}
370
371impl TaskStats {
372 #[inline(always)]
373 pub fn reset(&mut self) {
374 self.polls = 0;
375 self.yields = 0;
376 self.cpu_time_ns = 0;
377 }
378}
379
380#[repr(C)]
381pub struct Task {
382 global_id: u32,
383 leaf_idx: u16,
384 signal_idx: u8,
385 signal_bit: u8,
386 state: AtomicU8,
387 yielded: AtomicBool,
388 cpu_time_enabled: AtomicBool,
389 signal_ptr: *const TaskSignal,
390 slot_ptr: AtomicPtr<TaskSlot>,
391 summary_tree_ptr: *const Summary,
392 future_ptr: AtomicPtr<()>,
393 pinned_generator_ptr: AtomicPtr<()>,
395 generator_run_mode: AtomicU8,
397 stats: UnsafeCell<TaskStats>,
400}
401
402unsafe impl Send for Task {}
403unsafe impl Sync for Task {}
404
405#[repr(u8)]
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408pub enum GeneratorRunMode {
409 None = 0,
411 Switch = 1,
413 Poll = 2,
415}
416
417impl Task {
418 const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
419 Self::waker_clone,
420 Self::waker_wake,
421 Self::waker_wake_by_ref,
422 Self::waker_drop,
423 );
424
425 const WAKER_YIELD_VTABLE: RawWakerVTable = RawWakerVTable::new(
426 Self::waker_clone,
427 Self::waker_yield_wake,
428 Self::waker_yield_wake_by_ref,
429 Self::waker_drop,
430 );
431
432 unsafe fn construct(
433 ptr: *mut Task,
434 global_id: u32,
435 leaf_idx: u16,
436 signal_idx: u8,
437 signal_bit: u8,
438 signal_ptr: *const TaskSignal,
439 slot_ptr: *mut TaskSlot,
440 ) {
441 unsafe {
442 ptr::write(
443 ptr,
444 Task {
445 global_id,
446 leaf_idx,
447 signal_idx,
448 signal_bit,
449 signal_ptr,
450 slot_ptr: AtomicPtr::new(slot_ptr),
451 summary_tree_ptr: ptr::null(),
452 state: AtomicU8::new(TASK_IDLE),
453 yielded: AtomicBool::new(false),
454 cpu_time_enabled: AtomicBool::new(false),
455 future_ptr: AtomicPtr::new(ptr::null_mut()),
456 pinned_generator_ptr: AtomicPtr::new(ptr::null_mut()),
457 generator_run_mode: AtomicU8::new(GeneratorRunMode::None as u8),
458 stats: UnsafeCell::new(TaskStats::default()),
459 },
460 );
461 (*slot_ptr).set_task_ptr(ptr);
462 }
463 }
464
465 #[inline]
471 unsafe fn bind_summary_tree(&mut self, summary_tree: *const Summary) {
472 self.summary_tree_ptr = summary_tree;
473 }
474
475 pub fn global_id(&self) -> u32 {
476 self.global_id
477 }
478
479 #[inline(always)]
480 pub fn leaf_idx(&self) -> u16 {
481 self.leaf_idx
482 }
483
484 #[inline(always)]
485 pub fn signal_idx(&self) -> u8 {
486 self.signal_idx
487 }
488
489 #[inline(always)]
490 pub fn signal_bit(&self) -> u8 {
491 self.signal_bit
492 }
493
494 #[inline(always)]
495 pub fn stats(&self) -> TaskStats {
496 unsafe { *self.stats.get() }
497 }
498
499 #[inline(always)]
500 pub fn set_cpu_time_tracking(&self, enabled: bool) {
501 self.cpu_time_enabled.store(enabled, Ordering::Relaxed);
502 }
503
504 #[inline(always)]
505 fn record_poll(&self) {
506 unsafe {
507 let stats = &mut *self.stats.get();
508 stats.polls = stats.polls.saturating_add(1);
509 }
510 }
511
512 #[inline(always)]
513 pub(crate) fn record_yield(&self) {
514 unsafe {
515 let stats = &mut *self.stats.get();
516 stats.yields = stats.yields.saturating_add(1);
517 }
518 }
519
520 #[inline(always)]
521 fn record_cpu_time(&self, duration: Duration) {
522 let nanos = duration.as_nanos().min(u128::from(u64::MAX)) as u64;
523 unsafe {
524 let stats = &mut *self.stats.get();
525 stats.cpu_time_ns = stats.cpu_time_ns.saturating_add(nanos);
526 }
527 }
528
529 #[inline(always)]
530 pub fn slot(&self) -> Option<NonNull<TaskSlot>> {
531 NonNull::new(self.slot_ptr.load(Ordering::Acquire))
532 }
533
534 #[inline(always)]
535 pub fn clear_slot(&self) {
536 self.slot_ptr.store(ptr::null_mut(), Ordering::Release);
537 }
538
539 #[inline(always)]
540 pub fn state(&self) -> &AtomicU8 {
541 &self.state
542 }
543
544 #[inline(always)]
595 pub fn schedule(&self) {
596 if (self.state.load(Ordering::Acquire) & TASK_SCHEDULED) != TASK_IDLE {
597 return;
598 }
599
600 let previous_flags = self.state.fetch_or(TASK_SCHEDULED, Ordering::Release);
601 let scheduled_nor_executing =
602 (previous_flags & (TASK_SCHEDULED | TASK_EXECUTING)) == TASK_IDLE;
603
604 if scheduled_nor_executing {
605 let signal = unsafe { &*self.signal_ptr };
606 let (_was_empty, was_set) = signal.set(self.signal_bit as u64);
607 if was_set && !self.summary_tree_ptr.is_null() {
608 unsafe {
609 (*self.summary_tree_ptr)
610 .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
611 }
612 }
613 }
614 }
615
616 #[inline(always)]
617 pub(crate) fn try_begin_inline(&self) -> Result<(), u8> {
618 self.state
619 .compare_exchange(
620 TASK_IDLE,
621 TASK_EXECUTING,
622 Ordering::AcqRel,
623 Ordering::Acquire,
624 )
625 .map(|_| ())
626 .map_err(|current| current)
627 }
628
629 #[inline(always)]
666 pub(crate) fn begin(&self) {
667 self.state.store(TASK_EXECUTING, Ordering::Release);
668 }
669
670 #[inline(always)]
719 pub(crate) fn finish(&self) {
720 let after_flags = self.state.fetch_sub(TASK_EXECUTING, Ordering::AcqRel);
721 if after_flags & TASK_SCHEDULED != TASK_IDLE {
722 let signal = unsafe { &*self.signal_ptr };
723 let (_was_empty, was_set) = signal.set(self.signal_bit as u64);
724 if was_set && !self.summary_tree_ptr.is_null() {
725 unsafe {
726 (*self.summary_tree_ptr)
727 .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
728 }
729 }
730 }
731 }
732
733 #[inline(always)]
791 pub(crate) fn finish_and_schedule(&self) {
792 self.state.store(TASK_SCHEDULED, Ordering::Release);
793 let signal = unsafe { &*self.signal_ptr };
794 let (was_empty, was_set) = signal.set(self.signal_bit as u64);
795 if was_empty && was_set && !self.summary_tree_ptr.is_null() {
796 unsafe {
797 (*self.summary_tree_ptr)
798 .mark_signal_active(self.leaf_idx as usize, self.signal_idx as usize);
799 }
800 }
801 }
802
803 #[inline(always)]
804 pub(crate) fn clear_yielded(&self) {
805 self.yielded.store(false, Ordering::Relaxed);
806 }
807
808 #[inline(always)]
810 pub(crate) fn mark_yielded(&self) {
811 self.yielded.store(true, Ordering::Relaxed);
812 }
813
814 #[inline(always)]
815 pub fn is_yielded(&self) -> bool {
816 self.yielded.load(Ordering::Relaxed)
817 }
818
819 #[inline(always)]
820 pub unsafe fn waker_yield(&self) -> Waker {
821 let slot_ptr = self.slot_ptr.load(Ordering::Acquire);
822 debug_assert!(!slot_ptr.is_null(), "task is missing slot pointer");
823 let ptr = slot_ptr as *const ();
824 unsafe { Waker::from_raw(RawWaker::new(ptr, &Self::WAKER_YIELD_VTABLE)) }
825 }
826
827 #[inline(always)]
828 unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
829 RawWaker::new(ptr, &Self::WAKER_VTABLE)
830 }
831
832 #[inline(always)]
833 unsafe fn waker_yield_wake(ptr: *const ()) {
834 let slot = unsafe { &*(ptr as *const TaskSlot) };
835 let task_ptr = slot.task_ptr();
836 if task_ptr.is_null() {
837 return;
838 }
839 let task = unsafe { &*task_ptr };
840 task.yielded.store(true, Ordering::Relaxed);
841 }
842
843 #[inline(always)]
844 unsafe fn waker_yield_wake_by_ref(ptr: *const ()) {
845 let slot = unsafe { &*(ptr as *const TaskSlot) };
846 let task_ptr = slot.task_ptr();
847 if task_ptr.is_null() {
848 return;
849 }
850 let task = unsafe { &*task_ptr };
851 task.yielded.store(true, Ordering::Relaxed);
852 }
853
854 #[inline(always)]
855 unsafe fn waker_wake(ptr: *const ()) {
856 let slot = unsafe { &*(ptr as *const TaskSlot) };
857 let task_ptr = slot.task_ptr();
858 if task_ptr.is_null() {
859 return;
860 }
861 let task = unsafe { &*task_ptr };
862 task.schedule();
863 }
864
865 #[inline(always)]
866 unsafe fn waker_wake_by_ref(ptr: *const ()) {
867 let slot = unsafe { &*(ptr as *const TaskSlot) };
868 let task_ptr = slot.task_ptr();
869 if task_ptr.is_null() {
870 return;
871 }
872 let task = unsafe { &*task_ptr };
873 task.schedule();
874 }
875
876 #[inline(always)]
877 unsafe fn waker_drop(_: *const ()) {}
878
879 #[inline(always)]
880 pub unsafe fn poll_future(&self, cx: &mut Context<'_>) -> Option<Poll<()>> {
881 let ptr = self.future_ptr.load(Ordering::Acquire);
882 if ptr.is_null() {
883 return None;
884 }
885 self.record_poll();
886 if self.cpu_time_enabled.load(Ordering::Relaxed) {
887 let start = Instant::now();
888 let result = unsafe { FutureAllocator::poll_boxed(ptr, cx) };
889 self.record_cpu_time(start.elapsed());
890 result
891 } else {
892 unsafe { FutureAllocator::poll_boxed(ptr, cx) }
893 }
894 }
895
896 #[inline(always)]
897 pub fn attach_future(&self, future_ptr: *mut ()) -> Result<(), *mut ()> {
898 self.future_ptr
899 .compare_exchange(
900 ptr::null_mut(),
901 future_ptr,
902 Ordering::AcqRel,
903 Ordering::Acquire,
904 )
905 .map(|_| ())
906 .map_err(|existing| existing)
907 }
908
909 #[inline(always)]
910 pub fn take_future(&self) -> Option<*mut ()> {
911 let ptr = self.future_ptr.swap(ptr::null_mut(), Ordering::AcqRel);
912 if ptr.is_null() { None } else { Some(ptr) }
913 }
914
915 #[inline(always)]
922 pub unsafe fn reset(
923 &mut self,
924 global_id: u32,
925 leaf_idx: u16,
926 signal_idx: u8,
927 signal_bit: u8,
928 signal_ptr: *const TaskSignal,
929 slot_ptr: *mut TaskSlot,
930 ) {
931 self.global_id = global_id;
932 self.leaf_idx = leaf_idx;
933 self.signal_idx = signal_idx;
934 self.signal_bit = signal_bit;
935 self.signal_ptr = signal_ptr;
936 self.slot_ptr.store(slot_ptr, Ordering::Release);
937 let slot = unsafe { &*slot_ptr };
938 slot.set_task_ptr(self as *mut Task);
939 self.state.store(TASK_IDLE, Ordering::Relaxed);
940 self.yielded.store(false, Ordering::Relaxed);
941 self.cpu_time_enabled.store(false, Ordering::Relaxed);
942 self.future_ptr.store(ptr::null_mut(), Ordering::Relaxed);
943 self.pinned_generator_ptr.store(ptr::null_mut(), Ordering::Relaxed);
944 self.generator_run_mode.store(GeneratorRunMode::None as u8, Ordering::Relaxed);
945 unsafe {
946 let stats = &mut *self.stats.get();
947 stats.reset();
948 }
949 }
950
951 #[inline(always)]
953 pub fn has_pinned_generator(&self) -> bool {
954 !self.pinned_generator_ptr.load(Ordering::Acquire).is_null()
955 }
956
957 #[inline(always)]
960 pub unsafe fn get_pinned_generator<'a>(&self) -> Option<&'a mut Box<dyn Iterator<Item = usize> + 'static>> {
961 let ptr = self.pinned_generator_ptr.load(Ordering::Acquire);
962 if ptr.is_null() {
963 None
964 } else {
965 unsafe { Some(&mut *(ptr as *mut Box<dyn Iterator<Item = usize> + 'static>)) }
966 }
967 }
968
969 #[inline(always)]
972 pub unsafe fn pin_generator(&self, generator_box: Box<dyn Iterator<Item = usize> + 'static>, mode: GeneratorRunMode) {
973 let ptr = Box::into_raw(Box::new(generator_box)) as *mut ();
974 self.pinned_generator_ptr.store(ptr, Ordering::Release);
975 self.generator_run_mode.store(mode as u8, Ordering::Release);
976 }
977
978 #[inline(always)]
980 pub fn generator_run_mode(&self) -> GeneratorRunMode {
981 let mode = self.generator_run_mode.load(Ordering::Acquire);
982 match mode {
983 1 => GeneratorRunMode::Switch,
984 2 => GeneratorRunMode::Poll,
985 _ => GeneratorRunMode::None,
986 }
987 }
988
989 #[inline(always)]
992 pub unsafe fn set_generator_run_mode(&self, mode: GeneratorRunMode) {
993 self.generator_run_mode.store(mode as u8, Ordering::Release);
994 }
995
996 #[inline(always)]
999 pub unsafe fn take_pinned_generator(&self) -> Option<Box<Box<dyn Iterator<Item = usize> + 'static>>> {
1000 let ptr = self.pinned_generator_ptr.swap(ptr::null_mut(), Ordering::AcqRel);
1001 if ptr.is_null() {
1002 None
1003 } else {
1004 unsafe { Some(Box::from_raw(ptr as *mut Box<dyn Iterator<Item = usize> + 'static>)) }
1005 }
1006 }
1007}
1008
1009pub struct TaskArena {
1010 memory: NonNull<u8>,
1011 size: usize,
1012 config: TaskArenaConfig,
1013 layout: ArenaLayout,
1014 task_signals: Box<[TaskSignal]>, total_tasks: AtomicU64,
1016 is_closed: AtomicBool,
1017}
1018
1019#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1021pub enum SpawnError {
1022 Closed,
1024 NoCapacity,
1026 AttachFailed,
1028}
1029
1030impl fmt::Display for SpawnError {
1031 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1032 match self {
1033 SpawnError::Closed => write!(f, "executor arena is closed"),
1034 SpawnError::NoCapacity => write!(f, "no task slots available"),
1035 SpawnError::AttachFailed => write!(f, "task slot already has a future attached"),
1036 }
1037 }
1038}
1039
1040impl std::error::Error for SpawnError {}
1041
1042unsafe impl Send for TaskArena {}
1043unsafe impl Sync for TaskArena {}
1044
1045impl TaskArena {
1046 pub fn with_config(config: TaskArenaConfig, options: TaskArenaOptions) -> io::Result<Self> {
1047 let layout = ArenaLayout::new(&config);
1048 let memory_ptr = Self::allocate_memory(layout.total_size, &options)?;
1049 let memory = NonNull::new(memory_ptr)
1050 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "allocation returned null"))?;
1051
1052 if options.preinitialize_tasks {
1053 unsafe {
1054 ptr::write_bytes(memory.as_ptr(), 0, layout.total_size);
1055 }
1056 }
1057
1058 let signal_count = config.leaf_count * layout.signals_per_leaf;
1060 let task_signals = (0..signal_count)
1061 .map(|_| TaskSignal::new())
1062 .collect::<Vec<_>>()
1063 .into_boxed_slice();
1064
1065 let arena = TaskArena {
1066 memory,
1067 size: layout.total_size,
1068 config,
1069 layout,
1070 task_signals,
1071 total_tasks: AtomicU64::new(0),
1072 is_closed: AtomicBool::new(false),
1073 };
1074
1075 arena.initialize_task_slots();
1076 if options.preinitialize_tasks {
1077 arena.initialize_tasks();
1078 }
1079 Ok(arena)
1080 }
1081
1082 pub fn new(leaf_count: usize, tasks_per_leaf: usize) -> io::Result<Self> {
1083 Self::with_config(
1084 TaskArenaConfig::new(leaf_count, tasks_per_leaf)?,
1085 TaskArenaOptions::default(),
1086 )
1087 }
1088
1089 #[inline]
1090 pub fn is_closed(&self) -> bool {
1091 self.is_closed.load(Ordering::Acquire)
1092 }
1093
1094 #[inline]
1095 pub fn close(&self) {
1096 self.is_closed.store(true, Ordering::Release);
1097 }
1098
1099 #[inline]
1100 pub fn config(&self) -> &TaskArenaConfig {
1101 &self.config
1102 }
1103
1104 #[inline]
1105 pub fn layout(&self) -> &ArenaLayout {
1106 &self.layout
1107 }
1108
1109 #[inline]
1110 pub fn increment_total_tasks(&self) {
1111 self.total_tasks.fetch_add(1, Ordering::Relaxed);
1112 }
1113
1114 #[inline]
1115 pub fn decrement_total_tasks(&self) {
1116 self.total_tasks.fetch_sub(1, Ordering::Relaxed);
1117 }
1118
1119 fn initialize_task_slots(&self) {
1120 let total = self.config.leaf_count * self.config.tasks_per_leaf;
1121 let slots_ptr = self.task_slots_ptr();
1122
1123 unsafe {
1124 for idx in 0..total {
1125 let slot_ptr = slots_ptr.add(idx);
1126 ptr::write(slot_ptr, TaskSlot::new(ptr::null_mut()));
1127 }
1128 }
1129 }
1130
1131 fn initialize_tasks(&self) {
1132 let tasks_per_leaf = self.config.tasks_per_leaf;
1133 let tasks_ptr = self.tasks_ptr();
1134
1135 unsafe {
1136 for leaf in 0..self.config.leaf_count {
1137 for slot in 0..tasks_per_leaf {
1138 let idx = leaf * tasks_per_leaf + slot;
1139 let signal_idx = slot / 64;
1140 let signal_bit = (slot % 64) as u8;
1141 let signal_ptr = self.task_signal_ptr(leaf, signal_idx);
1142 let global_id = (leaf * tasks_per_leaf + slot) as u32;
1143 let task_ptr = tasks_ptr.add(idx);
1144 let slot_ptr = self.task_slot_ptr(leaf, slot);
1145 Task::construct(
1146 task_ptr,
1147 global_id,
1148 leaf as u16,
1149 signal_idx as u8,
1150 signal_bit,
1151 signal_ptr,
1152 slot_ptr,
1153 );
1154 }
1156 }
1157 }
1158 }
1159
1160 #[inline]
1161 fn tasks_ptr(&self) -> *mut Task {
1162 unsafe { self.memory.as_ptr().add(self.layout.task_offset) as *mut Task }
1163 }
1164
1165 #[inline]
1166 fn task_slots_ptr(&self) -> *mut TaskSlot {
1167 unsafe { self.memory.as_ptr().add(self.layout.task_slot_offset) as *mut TaskSlot }
1168 }
1169
1170 #[inline]
1171 fn task_slot_ptr(&self, leaf_idx: usize, slot_idx: usize) -> *mut TaskSlot {
1172 debug_assert!(leaf_idx < self.config.leaf_count);
1173 debug_assert!(slot_idx < self.config.tasks_per_leaf);
1174 unsafe {
1175 self.task_slots_ptr()
1176 .add(leaf_idx * self.config.tasks_per_leaf + slot_idx)
1177 }
1178 }
1179
1180 #[inline]
1181 pub fn task_signal_ptr(&self, leaf_idx: usize, signal_idx: usize) -> *const TaskSignal {
1182 let index = leaf_idx * self.layout.signals_per_leaf + signal_idx;
1183 &self.task_signals[index] as *const TaskSignal
1184 }
1185
1186 #[inline]
1187 pub fn active_signals(&self, leaf_idx: usize) -> *const TaskSignal {
1188 debug_assert!(leaf_idx < self.config.leaf_count);
1189 let index = leaf_idx * self.layout.signals_per_leaf;
1190 &self.task_signals[index] as *const TaskSignal
1191 }
1192
1193 #[inline]
1194 pub fn leaf_count(&self) -> usize {
1195 self.config.leaf_count
1196 }
1197
1198 #[inline]
1199 pub fn signals_per_leaf(&self) -> usize {
1200 self.layout.signals_per_leaf
1201 }
1202
1203 #[inline]
1204 pub fn tasks_per_leaf(&self) -> usize {
1205 self.config.tasks_per_leaf
1206 }
1207
1208 #[inline]
1209 pub fn compose_id(&self, leaf_idx: usize, slot_idx: usize) -> u32 {
1210 (leaf_idx * self.config.tasks_per_leaf + slot_idx) as u32
1211 }
1212
1213 #[inline]
1214 pub fn decompose_id(&self, global_id: u32) -> (usize, usize) {
1215 let tasks_per_leaf = self.config.tasks_per_leaf;
1216 let leaf_idx = (global_id as usize) / tasks_per_leaf;
1217 let slot_idx = (global_id as usize) % tasks_per_leaf;
1218 (leaf_idx, slot_idx)
1219 }
1220
1221 #[inline]
1222 pub unsafe fn task(&self, leaf_idx: usize, slot_idx: usize) -> &Task {
1223 debug_assert!(leaf_idx < self.config.leaf_count);
1224 debug_assert!(slot_idx < self.config.tasks_per_leaf);
1225 let signal_idx = slot_idx / 64;
1226 let bit_idx = (slot_idx % 64) as u8;
1227 let task_ptr = self
1228 .ensure_task_initialized(leaf_idx, signal_idx, bit_idx)
1229 .expect("task slot not initialized");
1230 unsafe { &*task_ptr.as_ptr() }
1231 }
1232
1233 fn ensure_task_initialized(
1234 &self,
1235 leaf_idx: usize,
1236 signal_idx: usize,
1237 bit_idx: u8,
1238 ) -> Option<NonNull<Task>> {
1239 let slot_idx = signal_idx * 64 + bit_idx as usize;
1240 if slot_idx >= self.config.tasks_per_leaf {
1241 return None;
1242 }
1243
1244 let slot_ptr = self.task_slot_ptr(leaf_idx, slot_idx);
1245 let slot = unsafe { &*slot_ptr };
1246
1247 let existing = slot.task_ptr();
1249 if !existing.is_null() {
1250 return NonNull::new(existing);
1251 }
1252
1253 debug_assert!(signal_idx < self.layout.signals_per_leaf);
1254 debug_assert!(bit_idx < 64);
1255
1256 let idx = leaf_idx * self.config.tasks_per_leaf + slot_idx;
1257 let task_ptr = unsafe { self.tasks_ptr().add(idx) };
1258 let signal_ptr = self.task_signal_ptr(leaf_idx, signal_idx);
1259 let global_id = self.compose_id(leaf_idx, slot_idx);
1260
1261 let sentinel = 0x1 as *mut Task; match slot.task_ptr_compare_exchange(ptr::null_mut(), sentinel) {
1265 Ok(_) => {
1266 unsafe {
1268 Task::construct(
1269 task_ptr,
1270 global_id,
1271 leaf_idx as u16,
1272 signal_idx as u8,
1273 bit_idx,
1274 signal_ptr,
1275 slot_ptr,
1276 );
1277 slot.set_task_ptr(task_ptr);
1281 }
1282 NonNull::new(task_ptr)
1283 }
1284 Err(actual) => {
1285 loop {
1288 let ptr = slot.task_ptr();
1289 if ptr != sentinel && !ptr.is_null() {
1290 return NonNull::new(ptr);
1291 }
1292 std::hint::spin_loop();
1293 }
1294 }
1295 }
1296 }
1297
1298 #[inline]
1299 pub fn handle_for_location(
1300 &self,
1301 leaf_idx: usize,
1302 signal_idx: usize,
1303 bit_idx: u8,
1304 ) -> Option<TaskHandle> {
1305 self.ensure_task_initialized(leaf_idx, signal_idx, bit_idx)
1306 .map(TaskHandle::from_non_null)
1307 }
1308
1309 pub fn init_task(&self, global_id: u32, summary_tree: *const Summary) {
1310 let (leaf_idx, slot_idx) = self.decompose_id(global_id);
1311 let signal_idx = slot_idx / 64;
1312 let signal_bit = (slot_idx % 64) as u8;
1313
1314 debug_assert!(signal_idx < self.layout.signals_per_leaf);
1315
1316 let task_ptr = self
1317 .ensure_task_initialized(leaf_idx, signal_idx, signal_bit)
1318 .expect("failed to initialize task slot");
1319
1320 unsafe {
1321 let task = &mut *task_ptr.as_ptr();
1322 let slot_ptr = self.task_slot_ptr(leaf_idx, slot_idx);
1323 let signal_ptr = self.task_signal_ptr(leaf_idx, signal_idx);
1324 task.reset(
1325 global_id,
1326 leaf_idx as u16,
1327 signal_idx as u8,
1328 signal_bit,
1329 signal_ptr,
1330 slot_ptr,
1331 );
1332 if task.summary_tree_ptr.is_null() {
1333 task.bind_summary_tree(summary_tree);
1334 }
1335 }
1336 }
1337
1338 #[allow(dead_code)]
1343 pub fn schedule_task_timer(
1344 &self,
1345 task: TaskHandle,
1346 timer: &TimerHandle,
1347 worker_id: u32,
1348 deadline_ns: u64,
1349 ) {
1350 let _ = (task, timer, worker_id, deadline_ns);
1351 }
1353
1354 #[inline(always)]
1355 pub(crate) fn task_handle_from_payload(ptr: *mut ()) -> Option<TaskHandle> {
1356 NonNull::new(ptr as *mut Task).map(TaskHandle::from_non_null)
1357 }
1358
1359 pub fn stats(&self) -> TaskArenaStats {
1360 TaskArenaStats {
1361 total_capacity: self.config.leaf_count * self.config.tasks_per_leaf,
1362 active_tasks: self.total_tasks.load(Ordering::Relaxed) as usize,
1363 worker_count: 0, }
1365 }
1366
1367 fn allocate_memory(size: usize, options: &TaskArenaOptions) -> io::Result<*mut u8> {
1368 #[cfg(unix)]
1369 {
1370 let mut flags = MAP_PRIVATE | MAP_ANONYMOUS;
1371 #[cfg(target_os = "linux")]
1372 if options.use_huge_pages {
1373 flags |= MAP_HUGETLB | MAP_HUGE_2MB;
1374 }
1375 let ptr = unsafe { mmap(ptr::null_mut(), size, PROT_READ | PROT_WRITE, flags, -1, 0) };
1376 if ptr == MAP_FAILED {
1377 Err(io::Error::last_os_error())
1378 } else {
1379 Ok(ptr as *mut u8)
1380 }
1381 }
1382
1383 #[cfg(windows)]
1384 {
1385 let mut flags = MEM_RESERVE | MEM_COMMIT;
1386 if options.use_huge_pages {
1387 flags |= MEM_LARGE_PAGES;
1388 }
1389 let ptr = unsafe { VirtualAlloc(ptr::null_mut(), size, flags, PAGE_READWRITE) };
1390 if ptr.is_null() {
1391 Err(io::Error::last_os_error())
1392 } else {
1393 Ok(ptr as *mut u8)
1394 }
1395 }
1396 }
1397}
1398
1399impl Drop for TaskArena {
1400 fn drop(&mut self) {
1401 unsafe {
1402 #[cfg(unix)]
1403 {
1404 munmap(self.memory.as_ptr() as *mut _, self.size);
1405 }
1406
1407 #[cfg(windows)]
1408 {
1409 VirtualFree(self.memory.as_ptr() as *mut _, 0, MEM_RELEASE);
1410 }
1411 }
1412 }
1413}
1414
1415#[derive(Clone, Copy, Debug)]
1416pub struct TaskArenaStats {
1417 pub total_capacity: usize,
1418 pub active_tasks: usize,
1419 pub worker_count: usize,
1420}
1421
1422#[cfg(test)]
1424mod tests {
1425 use super::*;
1426 use crate::runtime::worker::Worker;
1427 use std::future::poll_fn;
1428 use std::mem::{self, MaybeUninit};
1429 use std::ptr;
1430 use std::sync::Arc;
1431 use std::sync::atomic::{AtomicUsize, Ordering};
1432 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1433
1434 unsafe fn noop_clone(_: *const ()) -> RawWaker {
1435 RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
1436 }
1437
1438 unsafe fn noop(_: *const ()) {}
1439
1440 static NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
1441
1442 fn noop_waker() -> Waker {
1443 unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }
1444 }
1445
1446 fn setup_arena(leaf_count: usize, tasks_per_leaf: usize) -> Arc<TaskArena> {
1447 let config = TaskArenaConfig::new(leaf_count, tasks_per_leaf).unwrap();
1448 Arc::new(TaskArena::with_config(config, TaskArenaOptions::default()).unwrap())
1449 }
1450
1451 #[test]
1452 fn task_signal_basic_operations() {
1453 let signal = TaskSignal::new();
1454 let (was_empty, was_set) = signal.set(5);
1455 assert!(was_empty);
1456 assert!(was_set);
1457 assert!(signal.is_set(5));
1458
1459 let (remaining, acquired) = signal.try_acquire(5);
1460 assert!(acquired);
1461 assert_eq!(remaining & (1 << 5), 0);
1462 assert!(!signal.is_set(5));
1463
1464 let (remaining, now_empty) = signal.clear(5);
1465 assert_eq!(remaining, 0);
1466 assert!(now_empty);
1467 }
1468
1469 #[test]
1470 fn task_signal_set_idempotent() {
1471 let signal = TaskSignal::new();
1472 assert_eq!(signal.set(3), (true, true));
1473 assert_eq!(signal.set(3), (false, false));
1474 assert!(signal.is_set(3));
1475 }
1476
1477 #[test]
1478 fn task_signal_clear_noop_when_absent() {
1479 let signal = TaskSignal::new();
1480 let (remaining, now_empty) = signal.clear(7);
1481 assert_eq!(remaining, 0);
1482 assert!(now_empty);
1483 }
1484
1485 #[test]
1486 fn task_signal_try_acquire_unset_bit() {
1487 let signal = TaskSignal::new();
1488 let (remaining, acquired) = signal.try_acquire(12);
1489 assert_eq!(remaining, 0);
1490 assert!(!acquired);
1491 }
1492
1493 #[test]
1494 fn task_signal_try_acquire_from_wraps() {
1495 let signal = TaskSignal::new();
1496 signal.set(2);
1497 signal.set(60);
1498
1499 let (bit, _) = signal
1500 .try_acquire_from(59)
1501 .expect("expected to acquire bit after wrap");
1502 assert_eq!(bit, 60);
1503
1504 let (bit, _) = signal
1505 .try_acquire_from(61)
1506 .expect("expected to wrap to remaining bit");
1507 assert_eq!(bit, 2);
1508 assert!(signal.try_acquire_from(0).is_none());
1509 }
1510
1511 #[test]
1512 fn task_signal_try_acquire_from_until_empty() {
1513 let signal = TaskSignal::new();
1514 signal.set(0);
1515 signal.set(1);
1516
1517 assert!(signal.try_acquire_from(0).is_some());
1518 assert!(signal.try_acquire_from(0).is_some());
1519 assert!(signal.try_acquire_from(0).is_none());
1520 assert_eq!(signal.load(Ordering::Relaxed), 0);
1521 }
1522
1523 #[test]
1524 fn task_signal_try_acquire_from_selects_nearest() {
1525 let signal = TaskSignal::new();
1526 for bit in [2u64, 6, 10] {
1527 signal.set(bit);
1528 }
1529 let (bit, remaining) = signal
1530 .try_acquire_from(5)
1531 .expect("expected to acquire a bit");
1532 assert_eq!(bit, 6);
1533 assert_ne!(remaining & (1 << 2), 0);
1534 }
1535
1536 #[test]
1537 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1538 #[cfg(feature = "disabled_tests")]
1539 fn schedule_begin_finish_flow_clears_summary() {
1540 let arena = setup_arena(1, 64);
1541 let handle = arena.reserve_task().expect("reserve task");
1542 let leaf = handle.leaf_idx();
1543 let signal_idx = handle.signal_idx();
1544 let bit_idx = handle.bit_idx();
1545 let global = handle.global_id(arena.tasks_per_leaf());
1546 arena.init_task(global);
1547
1548 let slot_idx = signal_idx * 64 + bit_idx as usize;
1549 let task = unsafe { arena.task(leaf, slot_idx) };
1550 let signal = unsafe { &*task.signal_ptr };
1551
1552 assert_eq!(signal.load(Ordering::Relaxed), 0);
1553 task.schedule();
1554 assert!(signal.is_set(task.signal_bit));
1555 assert_ne!(
1556 arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1557 0
1558 );
1559
1560 let (remaining, acquired) = signal.try_acquire(bit_idx);
1561 assert!(acquired);
1562 if remaining == 0 {
1563 arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1564 }
1565 task.begin();
1566 task.finish();
1567 assert_eq!(
1568 arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1569 0
1570 );
1571
1572 arena.release_task(handle);
1573 }
1574
1575 #[test]
1576 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1577 #[cfg(feature = "disabled_tests")]
1578 fn finish_reschedules_when_work_arrives_during_execution() {
1579 let arena = setup_arena(1, 64);
1580 let handle = arena.reserve_task().expect("reserve task");
1581 let leaf = handle.leaf_idx();
1582 let signal_idx = handle.signal_idx();
1583 let bit_idx = handle.bit_idx();
1584 let global = handle.global_id(arena.tasks_per_leaf());
1585 arena.init_task(global);
1586 let slot_idx = signal_idx * 64 + bit_idx as usize;
1587 let task = unsafe { arena.task(leaf, slot_idx) };
1588 let signal = unsafe { &*task.signal_ptr };
1589
1590 task.schedule();
1591 let (remaining, acquired) = signal.try_acquire(bit_idx);
1592 assert!(acquired);
1593 if remaining == 0 {
1594 arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1595 }
1596 task.begin();
1597 task.schedule();
1599 task.finish();
1600 assert_ne!(
1601 arena.active_summary(leaf).load(Ordering::Acquire) & (1 << signal_idx),
1602 0,
1603 "queue should remain visible after concurrent schedule"
1604 );
1605
1606 arena.deactivate_task(handle);
1607 arena.release_task(handle);
1608 }
1609
1610 #[test]
1611 fn task_handle_reports_task_fields() {
1612 let arena = setup_arena(4, 128);
1613 let leaf = 2;
1614 let slot = 113;
1615 let signal = slot / 64;
1616 let bit = (slot % 64) as u8;
1617 let task = unsafe { arena.task(leaf, slot) };
1618 let handle = TaskHandle::from_task(task);
1619 assert_eq!(handle.leaf_idx(), leaf);
1620 assert_eq!(handle.signal_idx(), signal);
1621 assert_eq!(handle.bit_idx(), bit);
1622 }
1623
1624 #[test]
1625 fn task_handle_global_id_matches_components() {
1626 let arena = setup_arena(2, 128);
1627 let leaf = 1;
1628 let slot = 70;
1629 let task = unsafe { arena.task(leaf, slot) };
1630 let handle = TaskHandle::from_task(task);
1631 assert_eq!(handle.global_id(arena.tasks_per_leaf()), task.global_id());
1632 }
1633
1634 #[test]
1635 fn future_helpers_drop_boxed_accepts_null() {
1636 unsafe {
1637 FutureAllocator::drop_boxed(ptr::null_mut());
1638 }
1639 }
1640
1641 #[test]
1642 fn future_helpers_poll_boxed_accepts_null() {
1643 let waker = noop_waker();
1644 let mut cx = Context::from_waker(&waker);
1645 assert!(unsafe { FutureAllocator::poll_boxed(ptr::null_mut(), &mut cx) }.is_none());
1646 }
1647
1648 #[test]
1649 #[cfg(feature = "disabled_tests")] fn task_construct_initializes_fields() {
1651 let mut storage = MaybeUninit::<Task>::uninit();
1652 let mut slot_storage = MaybeUninit::<TaskSlot>::uninit();
1653 let signal = TaskSignal::new();
1654 unsafe {
1655 slot_storage
1656 .as_mut_ptr()
1657 .write(TaskSlot::new(storage.as_mut_ptr()));
1658 Task::construct(
1659 storage.as_mut_ptr(),
1660 42,
1661 1,
1662 2,
1663 3,
1664 4,
1665 &signal as *const _,
1666 slot_storage.as_mut_ptr(),
1667 );
1668 let task = &*storage.as_ptr();
1669 assert_eq!(task.global_id, 42);
1670 assert_eq!(task.leaf_idx, 1);
1671 assert_eq!(task.signal_idx, 2);
1672 assert_eq!(task.slot_idx, 3);
1673 assert_eq!(task.signal_bit, 4);
1674 assert_eq!(task.state.load(Ordering::Relaxed), TASK_IDLE);
1675 assert!(!task.yielded.load(Ordering::Relaxed));
1676 assert!(task.future_ptr.load(Ordering::Relaxed).is_null());
1677 ptr::drop_in_place(storage.as_mut_ptr());
1678 ptr::drop_in_place(slot_storage.as_mut_ptr());
1679 }
1680 }
1681
1682 #[test]
1683 #[cfg(feature = "disabled_tests")] fn task_bind_arena_sets_pointer() {
1685 let mut storage = MaybeUninit::<Task>::uninit();
1686 let mut slot_storage = MaybeUninit::<TaskSlot>::uninit();
1687 let signal = TaskSignal::new();
1688 let arena = setup_arena(1, 64);
1689 unsafe {
1690 slot_storage
1691 .as_mut_ptr()
1692 .write(TaskSlot::new(storage.as_mut_ptr()));
1693 Task::construct(
1694 storage.as_mut_ptr(),
1695 5,
1696 0,
1697 0,
1698 0,
1699 0,
1700 &signal as *const _,
1701 slot_storage.as_mut_ptr(),
1702 );
1703 let task = &*storage.as_ptr();
1704 assert!(task.arena_ptr.load(Ordering::Relaxed).is_null());
1705 task.bind_arena(Arc::as_ptr(&arena));
1706 assert_eq!(
1707 task.arena_ptr.load(Ordering::Relaxed),
1708 Arc::as_ptr(&arena) as *mut ExecutorArena
1709 );
1710 ptr::drop_in_place(storage.as_mut_ptr());
1711 ptr::drop_in_place(slot_storage.as_mut_ptr());
1712 }
1713 }
1714
1715 #[cfg(feature = "disabled_tests")]
1719 mod needs_worker_service {
1720 use super::*;
1721
1722 #[test]
1723 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1724 fn task_schedule_is_idempotent() {
1725 let arena = setup_arena(1, 64);
1726 let handle = arena.reserve_task().expect("reserve task");
1727 let global = handle.global_id(arena.tasks_per_leaf());
1728 arena.init_task(global);
1729 let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1730 let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1731 let signal = unsafe { &*task.signal_ptr };
1732
1733 task.schedule();
1734 let summary_after_first = arena
1735 .active_summary(handle.leaf_idx())
1736 .load(Ordering::Relaxed);
1737 assert!(summary_after_first & (1 << handle.signal_idx()) != 0);
1738
1739 task.schedule();
1740 let summary_after_second = arena
1741 .active_summary(handle.leaf_idx())
1742 .load(Ordering::Relaxed);
1743 assert_eq!(summary_after_first, summary_after_second);
1744 assert_eq!(signal.load(Ordering::Relaxed), 1 << task.signal_bit);
1745
1746 arena.deactivate_task(handle);
1747 arena.release_task(handle);
1748 }
1749
1750 #[test]
1751 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1752 fn task_begin_overwrites_state() {
1753 let arena = setup_arena(1, 64);
1754 let handle = arena.reserve_task().expect("reserve task");
1755 let global = handle.global_id(arena.tasks_per_leaf());
1756 arena.init_task(global);
1757 let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1758 let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1759
1760 task.schedule();
1761 task.begin();
1762 assert_eq!(task.state.load(Ordering::Relaxed), TASK_EXECUTING);
1763
1764 arena.deactivate_task(handle);
1765 arena.release_task(handle);
1766 }
1767
1768 #[test]
1769 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1770 fn task_finish_without_new_schedule_clears_signal() {
1771 let arena = setup_arena(1, 64);
1772 let handle = arena.reserve_task().expect("reserve task");
1773 let leaf = handle.leaf_idx();
1774 let signal_idx = handle.signal_idx();
1775 let bit_idx = handle.bit_idx();
1776 let global = handle.global_id(arena.tasks_per_leaf());
1777 arena.init_task(global);
1778 let slot_idx = signal_idx * 64 + bit_idx as usize;
1779 let task = unsafe { arena.task(leaf, slot_idx) };
1780 let signal = unsafe { &*task.signal_ptr };
1781
1782 task.schedule();
1783 let (remaining, acquired) = signal.try_acquire(bit_idx);
1784 assert!(acquired);
1785 if remaining == 0 {
1786 arena.active_tree().mark_signal_inactive(leaf, signal_idx);
1787 }
1788 task.begin();
1789 task.finish();
1790
1791 assert_eq!(task.state.load(Ordering::Relaxed), TASK_IDLE);
1792 assert_eq!(signal.load(Ordering::Relaxed), 0);
1793 assert_eq!(
1794 arena.active_summary(leaf).load(Ordering::Relaxed) & (1 << signal_idx),
1795 0
1796 );
1797
1798 arena.release_task(handle);
1799 }
1800
1801 #[test]
1802 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1803 fn task_finish_and_schedule_sets_signal() {
1804 let arena = setup_arena(1, 64);
1805 let handle = arena.reserve_task().expect("reserve task");
1806 let leaf = handle.leaf_idx();
1807 let signal_idx = handle.signal_idx();
1808 let bit_idx = handle.bit_idx();
1809 let global = handle.global_id(arena.tasks_per_leaf());
1810 arena.init_task(global);
1811 let slot_idx = signal_idx * 64 + bit_idx as usize;
1812 let task = unsafe { arena.task(leaf, slot_idx) };
1813 let signal = unsafe { &*task.signal_ptr };
1814
1815 task.finish_and_schedule();
1816 assert_eq!(task.state.load(Ordering::Relaxed), TASK_SCHEDULED);
1817 assert!(signal.is_set(task.signal_bit));
1818 assert_ne!(
1819 arena.active_summary(leaf).load(Ordering::Relaxed) & (1 << signal_idx),
1820 0
1821 );
1822
1823 arena.deactivate_task(handle);
1824 arena.release_task(handle);
1825 }
1826
1827 #[test]
1828 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1829 fn task_clear_yielded_and_is_yielded() {
1830 let arena = setup_arena(1, 64);
1831 let handle = arena.reserve_task().expect("reserve task");
1832 let global = handle.global_id(arena.tasks_per_leaf());
1833 arena.init_task(global);
1834 let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1835 let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1836
1837 task.yielded.store(true, Ordering::Relaxed);
1838 assert!(task.is_yielded());
1839 task.clear_yielded();
1840 assert!(!task.is_yielded());
1841
1842 arena.release_task(handle);
1843 }
1844
1845 #[test]
1846 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1847 fn task_attach_future_rejects_second_future() {
1848 let arena = setup_arena(1, 64);
1849 let handle = arena.reserve_task().expect("reserve task");
1850 let global = handle.global_id(arena.tasks_per_leaf());
1851 arena.init_task(global);
1852 let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1853 let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1854
1855 let first_ptr = FutureAllocator::box_future(async {});
1856 task.attach_future(first_ptr).unwrap();
1857 let second_ptr = FutureAllocator::box_future(async {});
1858 let existing = task.attach_future(second_ptr).unwrap_err();
1859 assert_eq!(existing, first_ptr);
1860 unsafe { FutureAllocator::drop_boxed(second_ptr) };
1861
1862 let ptr = task.take_future().unwrap();
1863 unsafe { FutureAllocator::drop_boxed(ptr) };
1864 arena.release_task(handle);
1865 }
1866
1867 #[test]
1868 #[ignore = "Needs WorkerService helper - uses reserve_task()"]
1869 fn task_take_future_clears_pointer() {
1870 let arena = setup_arena(1, 64);
1871 let handle = arena.reserve_task().expect("reserve task");
1872 let global = handle.global_id(arena.tasks_per_leaf());
1873 arena.init_task(global);
1874 let slot_idx = handle.signal_idx() * 64 + handle.bit_idx() as usize;
1875 let task = unsafe { arena.task(handle.leaf_idx(), slot_idx) };
1876
1877 let future_ptr = FutureAllocator::box_future(async {});
1878 task.attach_future(future_ptr).unwrap();
1879 let returned = task.take_future().unwrap();
1880 assert_eq!(returned, future_ptr);
1881 assert!(task.take_future().is_none());
1882 unsafe { FutureAllocator::drop_boxed(returned) };
1883 arena.release_task(handle);
1884 }
1885
1886 #[test]
1919 #[ignore = "Needs WorkerService helper - uses active_tree()"]
1920 fn reserve_task_in_leaf_exhaustion() {
1921 let arena = setup_arena(1, 64);
1922 let mut bits = Vec::with_capacity(64);
1923 for _ in 0..64 {
1924 let bit = arena
1925 .active_tree()
1926 .reserve_task_in_leaf(0, 0)
1927 .expect("expected available bit");
1928 bits.push(bit);
1929 }
1930 assert!(arena.active_tree().reserve_task_in_leaf(0, 0).is_none());
1931 for bit in &bits {
1932 arena.active_tree().release_task_in_leaf(0, 0, *bit);
1933 }
1934 }
1935
1936 #[test]
1937 #[ignore = "Needs WorkerService helper - uses active_tree()"]
1938 fn reserve_task_in_leaf_after_release() {
1939 let arena = setup_arena(1, 64);
1940 let bit = arena
1941 .active_tree()
1942 .reserve_task_in_leaf(0, 0)
1943 .expect("expected bit");
1944 arena.active_tree().release_task_in_leaf(0, 0, bit);
1945 let new_bit = arena
1946 .active_tree()
1947 .reserve_task_in_leaf(0, 0)
1948 .expect("bit after release");
1949 arena.active_tree().release_task_in_leaf(0, 0, new_bit);
1950 }
1951 } }