use alloc::vec::Vec;
#[allow(unused_imports)]
use core::arch::asm;
use core::cell::UnsafeCell;
use core::sync::atomic::{AtomicU8, AtomicU32, AtomicUsize, Ordering};
pub type TaskIndex = u32;
pub const CHUNK_SIZE: usize = 32;
pub const MAILBOX_CAPACITY: usize = 1024;
pub const MAILBOX_MASK: usize = MAILBOX_CAPACITY - 1;
pub const LOCAL_QUEUE_CAPACITY: usize = 131_072;
pub const LOCAL_QUEUE_MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
#[derive(Debug, Clone, Copy)]
pub struct TaskChunk {
pub tasks: [TaskIndex; CHUNK_SIZE],
pub count: usize,
}
impl Default for TaskChunk {
#[inline(always)]
fn default() -> Self {
Self {
tasks: [0; CHUNK_SIZE],
count: 0,
}
}
}
#[allow(dead_code)]
pub struct HugeBuffer<T> {
ptr: *mut T,
size_bytes: usize,
is_mmap: bool,
}
unsafe impl<T> Send for HugeBuffer<T> {}
unsafe impl<T> Sync for HugeBuffer<T> {}
impl<T> Default for HugeBuffer<T> {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<T> HugeBuffer<T> {
#[inline]
#[must_use]
pub fn new() -> Self {
let size_bytes = core::mem::size_of::<T>();
#[cfg(unix)]
unsafe {
let mut flags = libc::MAP_PRIVATE | libc::MAP_ANONYMOUS;
if size_bytes >= 2 * 1024 * 1024 {
flags |= 0x40000; }
let ptr = libc::mmap(
core::ptr::null_mut(),
size_bytes,
libc::PROT_READ | libc::PROT_WRITE,
flags,
-1,
0,
);
if ptr == libc::MAP_FAILED {
let layout = std::alloc::Layout::from_size_align(size_bytes, 64).unwrap();
let alloc_ptr = std::alloc::alloc_zeroed(layout);
assert!(!alloc_ptr.is_null(), "HugeBuffer std::alloc failed");
Self {
ptr: alloc_ptr.cast::<T>(),
size_bytes,
is_mmap: false,
}
} else {
core::ptr::write_bytes(ptr, 0, size_bytes);
Self {
ptr: ptr.cast::<T>(),
size_bytes,
is_mmap: true,
}
}
}
#[cfg(windows)]
unsafe {
use windows_sys::Win32::System::Memory;
#[cfg(feature = "windows-root")]
{
let mut ptr = Memory::VirtualAlloc(
core::ptr::null_mut(),
size_bytes,
Memory::MEM_RESERVE | Memory::MEM_COMMIT | Memory::MEM_LARGE_PAGES,
Memory::PAGE_READWRITE,
);
if ptr.is_null() {
ptr = Memory::VirtualAlloc(
core::ptr::null_mut(),
size_bytes,
Memory::MEM_RESERVE | Memory::MEM_COMMIT,
Memory::PAGE_READWRITE,
);
assert!(!ptr.is_null(), "HugeBuffer VirtualAlloc failed");
}
Self {
ptr: ptr.cast::<T>(),
size_bytes,
is_mmap: false,
}
}
#[cfg(not(feature = "windows-root"))]
{
let ptr = Memory::VirtualAlloc(
core::ptr::null_mut(),
size_bytes,
Memory::MEM_RESERVE | Memory::MEM_COMMIT,
Memory::PAGE_READWRITE,
);
assert!(!ptr.is_null(), "HugeBuffer VirtualAlloc failed");
Self {
ptr: ptr as *mut T,
size_bytes,
is_mmap: false,
}
}
}
}
}
impl<T> Drop for HugeBuffer<T> {
#[inline(always)]
fn drop(&mut self) {
#[cfg(unix)]
unsafe {
if self.is_mmap {
libc::munmap(self.ptr.cast::<libc::c_void>(), self.size_bytes);
} else {
let layout = std::alloc::Layout::from_size_align(self.size_bytes, 64).unwrap();
std::alloc::dealloc(self.ptr.cast::<u8>(), layout);
}
}
#[cfg(windows)]
unsafe {
windows_sys::Win32::System::Memory::VirtualFree(
self.ptr.cast::<core::ffi::c_void>(),
0,
windows_sys::Win32::System::Memory::MEM_RELEASE,
);
}
}
}
#[repr(align(64))]
pub struct Mailbox {
pub head: AtomicUsize,
_pad1: [u8; 64 - core::mem::size_of::<AtomicUsize>()],
pub tail: AtomicUsize,
_pad2: [u8; 64 - core::mem::size_of::<AtomicUsize>()],
pub buffer: HugeBuffer<UnsafeCell<[TaskChunk; MAILBOX_CAPACITY]>>,
}
unsafe impl Sync for Mailbox {}
unsafe impl Send for Mailbox {}
impl Default for Mailbox {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl Mailbox {
#[inline(always)]
#[must_use]
pub fn new() -> Self {
Self {
head: AtomicUsize::new(0),
_pad1: [0; 56],
tail: AtomicUsize::new(0),
_pad2: [0; 56],
buffer: HugeBuffer::new(),
}
}
#[inline(always)]
#[allow(clippy::result_large_err)]
pub fn push(&self, chunk: TaskChunk) -> Result<(), TaskChunk> {
let current_tail = self.tail.load(Ordering::Relaxed);
let next_tail = (current_tail + 1) & MAILBOX_MASK;
if next_tail == self.head.load(Ordering::Acquire) {
return Err(chunk);
}
unsafe {
let buffer_ptr = (*self.buffer.ptr).get().cast::<TaskChunk>();
*buffer_ptr.add(current_tail) = chunk;
}
self.tail
.store(next_tail, core::sync::atomic::Ordering::Release);
#[cfg(all(
feature = "hw-acceleration",
any(target_arch = "x86", target_arch = "x86_64")
))]
unsafe {
core::arch::asm!("cldemote [{}]", in(reg) &raw const self.tail);
}
#[cfg(all(feature = "hw-acceleration", target_arch = "aarch64"))]
unsafe {
core::arch::asm!("dc cvac, {}", in(reg) &self.tail);
}
#[cfg(all(feature = "hw-acceleration", target_arch = "riscv64"))]
unsafe {
core::arch::asm!("cbo.clean 0({0})", in(reg) &self.tail);
}
Ok(())
}
#[inline(always)]
pub fn pop(&self) -> Option<TaskChunk> {
let current_head = self.head.load(Ordering::Relaxed);
if current_head == self.tail.load(core::sync::atomic::Ordering::Relaxed) {
return None;
}
if current_head == self.tail.load(core::sync::atomic::Ordering::Acquire) {
return None; }
let chunk = unsafe {
let buffer_ptr = (*self.buffer.ptr).get().cast::<TaskChunk>();
core::ptr::read(buffer_ptr.add(current_head))
};
let next_head = (current_head + 1) & MAILBOX_MASK;
self.head.store(next_head, Ordering::Release);
Some(chunk)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CpuLevel {
pub core_id: u16,
pub ccx_id: u16,
pub numa_id: u16,
}
pub use crate::common_types::TopologyMode;
#[repr(align(64))]
pub struct Worker {
pub cpu: CpuLevel,
pub load_level: AtomicU8,
pub deflection_threshold: AtomicU8,
pub event_signal: AtomicU32,
pub local_queue: HugeBuffer<[TaskIndex; LOCAL_QUEUE_CAPACITY]>,
pub local_head: AtomicUsize,
pub local_tail: AtomicUsize,
pub ticks: u64,
pub polling_order: Vec<usize>,
}
unsafe impl Sync for Worker {}
unsafe impl Send for Worker {}
impl Worker {
#[inline(always)]
#[must_use]
#[allow(clippy::cast_possible_truncation)]
pub fn new(cpu: CpuLevel, total_cores: usize) -> Self {
let mut polling_order = Vec::with_capacity(total_cores - 1);
let my_core = cpu.core_id as usize;
let my_ccx = cpu.ccx_id;
for i in 0..total_cores {
if i != my_core && (i / 8) as u16 == my_ccx {
polling_order.push(i);
}
}
for i in 0..total_cores {
if i != my_core && (i / 8) as u16 != my_ccx {
polling_order.push(i);
}
}
Self {
cpu,
load_level: AtomicU8::new(0),
deflection_threshold: AtomicU8::new(80),
local_queue: HugeBuffer::new(),
local_head: AtomicUsize::new(0),
local_tail: AtomicUsize::new(0),
ticks: 0,
event_signal: AtomicU32::new(0),
polling_order,
}
}
#[inline(always)]
pub fn local_queue_len(&self) -> usize {
let head = self.local_head.load(core::sync::atomic::Ordering::Acquire);
let tail = self.local_tail.load(core::sync::atomic::Ordering::Acquire);
tail.wrapping_sub(head) & LOCAL_QUEUE_MASK
}
#[inline(always)]
pub fn update_load(&self) {
let queue_len = self.local_queue_len();
#[allow(clippy::cast_possible_truncation)]
let load = core::cmp::min((queue_len * 100) >> 13, 100) as u8;
self.load_level.store(load, Ordering::Relaxed);
}
#[inline(always)]
pub fn tick(&mut self) {
self.ticks = self.ticks.wrapping_add(1);
if self.ticks.trailing_zeros() >= 10 {
let load = self.load_level.load(Ordering::Relaxed);
let current_thresh = self.deflection_threshold.load(Ordering::Relaxed);
let new_thresh = if load > 90 {
current_thresh.saturating_sub(5).max(40)
} else if load < 30 {
current_thresh.saturating_add(5).min(95)
} else {
current_thresh
};
self.deflection_threshold
.store(new_thresh, Ordering::Relaxed);
}
}
#[inline(always)]
pub fn push_local(&self, task: TaskIndex) -> bool {
let tail = self.local_tail.load(Ordering::Relaxed);
if self.local_queue_len() >= LOCAL_QUEUE_CAPACITY - 1 {
return false;
}
unsafe {
let buffer_ptr = self.local_queue.ptr.cast::<TaskIndex>();
*buffer_ptr.add(tail) = task;
}
self.local_tail
.store((tail + 1) & LOCAL_QUEUE_MASK, Ordering::Release);
true
}
#[inline(always)]
pub fn push_batch(&mut self, chunk: &TaskChunk) {
let count = chunk.count;
let tail = self.local_tail.load(core::sync::atomic::Ordering::Relaxed);
let end_idx = tail.wrapping_add(count);
if end_idx <= LOCAL_QUEUE_CAPACITY {
unsafe {
core::ptr::copy_nonoverlapping(
chunk.tasks.as_ptr(),
(*self.local_queue.ptr).as_mut_ptr().add(tail),
count,
);
}
} else {
let first_part = LOCAL_QUEUE_CAPACITY - tail;
let second_part = count - first_part;
unsafe {
core::ptr::copy_nonoverlapping(
chunk.tasks.as_ptr(),
(*self.local_queue.ptr).as_mut_ptr().add(tail),
first_part,
);
core::ptr::copy_nonoverlapping(
chunk.tasks.as_ptr().add(first_part),
(*self.local_queue.ptr).as_mut_ptr(),
second_part,
);
}
}
self.local_tail.store(
end_idx & LOCAL_QUEUE_MASK,
core::sync::atomic::Ordering::Release,
);
}
#[inline(always)]
pub unsafe fn dispatch_loop(&self, pool: &crate::memory_management::ContextPool) {
let mut head = self.local_head.load(Ordering::Acquire);
while head != self.local_tail.load(Ordering::Acquire) {
let task = unsafe {
let buffer_ptr = self.local_queue.ptr.cast::<TaskIndex>();
*buffer_ptr.add(head)
};
head = (head + 1) & LOCAL_QUEUE_MASK;
self.local_head
.store(head, core::sync::atomic::Ordering::Release);
let target_ptr = pool.get_context_ptr(task);
#[cfg(target_arch = "x86_64")]
unsafe {
core::arch::x86_64::_mm_prefetch::<0>(target_ptr as *const i8);
}
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!("prfm pldl1keep, [{0}]", in(reg) target_ptr, options(nostack, preserves_flags));
}
#[cfg(all(target_arch = "riscv64", feature = "hw-acceleration"))]
unsafe {
core::arch::asm!("prefetch.r 0({0})", in(reg) target_ptr, options(nostack, preserves_flags));
}
crate::future_bridge::CURRENT_FIBER.with(|c| c.set(target_ptr));
unsafe {
((*target_ptr).switch_fn)(
&raw mut (*target_ptr).executor_regs,
&raw const (*target_ptr).regs,
);
}
crate::future_bridge::CURRENT_FIBER.with(|c| c.set(core::ptr::null_mut()));
let post_state = unsafe {
(*target_ptr)
.state
.load(core::sync::atomic::Ordering::Acquire)
};
let mut final_state = post_state;
if post_state == crate::memory_management::FiberStatus::Suspending as u32 {
match unsafe {
(*target_ptr).state.compare_exchange(
crate::memory_management::FiberStatus::Suspending as u32,
crate::memory_management::FiberStatus::Yielded as u32,
core::sync::atomic::Ordering::Release,
core::sync::atomic::Ordering::Acquire,
)
} {
Ok(_) => final_state = crate::memory_management::FiberStatus::Yielded as u32,
Err(actual) => final_state = actual,
}
}
if final_state == crate::memory_management::FiberStatus::Finished as u32
|| final_state == crate::memory_management::FiberStatus::Panicked as u32
{
pool.free_context(task);
} else if final_state == crate::memory_management::FiberStatus::Notified as u32 {
self.push_local(task);
return;
}
}
}
}
pub struct DtaScheduler {
pub workers: Vec<UnsafeCell<Worker>>,
pub mailboxes: Vec<Vec<Mailbox>>,
pub external_mailboxes: Vec<Mailbox>,
pub external_locks: Vec<crate::utils::SpinLock>,
pub topology: TopologyMode,
#[allow(clippy::type_complexity)]
pub enqueue_jmp: [fn(&Self, usize, usize, TaskIndex) -> bool; 2],
}
unsafe impl Sync for DtaScheduler {}
unsafe impl Send for DtaScheduler {}
impl DtaScheduler {
#[inline(always)]
#[must_use]
pub fn new(num_workers: usize, topology: TopologyMode) -> Self {
let mut workers = Vec::with_capacity(num_workers);
let mut mailboxes = Vec::with_capacity(num_workers);
let mut external_mailboxes = Vec::with_capacity(num_workers);
let mut external_locks = Vec::with_capacity(num_workers);
for i in 0..num_workers {
#[allow(clippy::cast_possible_truncation)]
workers.push(UnsafeCell::new(Worker::new(
CpuLevel {
core_id: i as u16,
ccx_id: (i / 8) as u16,
numa_id: (i / 64) as u16,
},
num_workers,
)));
let mut row = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
row.push(Mailbox::new());
}
mailboxes.push(row);
external_mailboxes.push(Mailbox::new());
external_locks.push(crate::utils::SpinLock::new());
}
Self {
workers,
mailboxes,
external_mailboxes,
external_locks,
topology,
enqueue_jmp: [Self::do_push_local, Self::do_push_remote],
}
}
#[inline(always)]
fn signal_worker(&self, target_core: usize) {
unsafe {
let worker = &*self.workers[target_core].get();
let order = core::sync::atomic::Ordering::Release;
worker.event_signal.fetch_add(1, order);
crate::utils::futex_wake(
(&raw const worker.event_signal).cast::<core::sync::atomic::AtomicU32>(),
);
}
}
#[inline(always)]
fn do_push_local(&self, source_core: usize, target_core: usize, task: TaskIndex) -> bool {
let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
if current_worker == source_core {
unsafe {
let worker = &*self.workers[source_core].get();
if worker.push_local(task) {
return true;
}
}
}
loop {
self.external_locks[target_core].lock();
let mut chunk = TaskChunk::default();
chunk.tasks[0] = task;
chunk.count = 1;
let res = self.external_mailboxes[target_core].push(chunk);
self.external_locks[target_core].unlock();
if res.is_ok() {
self.signal_worker(target_core);
return true;
}
core::hint::spin_loop();
}
}
#[inline(always)]
fn do_push_remote(&self, _source_core: usize, target_core: usize, task: TaskIndex) -> bool {
let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
let mut retries = 0u32;
loop {
let success = if current_worker < self.workers.len() {
let mut chunk = TaskChunk::default();
chunk.tasks[0] = task;
chunk.count = 1;
self.mailboxes[current_worker][target_core]
.push(chunk)
.is_ok()
} else {
self.external_locks[target_core].lock();
let mut chunk = TaskChunk::default();
chunk.tasks[0] = task;
chunk.count = 1;
let success = self.external_mailboxes[target_core].push(chunk).is_ok();
self.external_locks[target_core].unlock();
success
};
if success {
self.signal_worker(target_core);
break;
}
retries = retries.saturating_add(1);
if retries > 1024 {
std::thread::yield_now();
} else {
core::hint::spin_loop();
}
}
#[cfg(all(
feature = "hw-acceleration",
any(target_arch = "x86", target_arch = "x86_64")
))]
unsafe {
core::arch::asm!(
"mov rax, {}",
".byte 0xf3, 0x0f, 0xc7, 0xf0",
in(reg) target_core as u64,
out("rax") _,
options(nostack, preserves_flags),
);
}
#[cfg(all(feature = "hw-acceleration", target_arch = "aarch64"))]
unsafe {
core::arch::asm!("sev", options(nostack, preserves_flags));
}
#[cfg(all(feature = "hw-acceleration", target_arch = "riscv64"))]
unsafe {
core::arch::asm!("csrw uipi, {0}", in(reg) target_core);
}
true
}
#[inline(always)]
#[must_use]
pub fn enqueue_task(&self, source_core: usize, flow_id: u64, task: TaskIndex) -> bool {
let num_workers = self.workers.len();
let source_core = source_core % num_workers;
let worker_ref = unsafe { &*self.workers[source_core].get() };
let threshold = worker_ref.deflection_threshold.load(Ordering::Relaxed);
let load = worker_ref.load_level.load(Ordering::Relaxed);
let deflect_mask = if load > threshold { usize::MAX } else { 0 };
#[allow(clippy::cast_possible_truncation)]
let h1 = (flow_id & 7) as usize;
#[allow(clippy::cast_possible_truncation)]
let h2 = ((flow_id >> 3) & 7 | 1) as usize;
let target_core = if self.topology == TopologyMode::Global {
(source_core + h1 + h2) % num_workers
} else {
let ccx_base = source_core & !7;
let local_idx = source_core & 7;
let deflect_target = (local_idx + h1 + h2) & 7;
let target_idx = local_idx ^ ((local_idx ^ deflect_target) & deflect_mask);
(ccx_base | target_idx) % num_workers
};
let jump_idx = usize::from(target_core != source_core);
(self.enqueue_jmp[jump_idx])(self, source_core, target_core, task)
}
#[inline(always)]
pub fn poll_mailboxes(&self, current_core: usize) {
let worker = unsafe { &mut *self.workers[current_core].get() };
let num_polls = worker.polling_order.len();
for idx in 0..num_polls {
let i = worker.polling_order[idx];
let row = &self.mailboxes[i];
while let Some(chunk) = row[current_core].pop() {
worker.push_batch(&chunk);
}
}
while let Some(chunk) = self.external_mailboxes[current_core].pop() {
worker.push_batch(&chunk);
}
worker.update_load();
worker.tick();
}
#[inline]
#[allow(clippy::too_many_lines)]
pub fn run_worker_static(
scheduler: &Self,
current_core: usize,
pool: &crate::memory_management::ContextPool,
shutdown: &core::sync::atomic::AtomicBool,
) {
crate::future_bridge::CURRENT_WORKER_ID.with(|c| c.set(current_core));
let mut idle_count: u32 = 0;
loop {
if shutdown.load(core::sync::atomic::Ordering::Acquire) {
return;
}
let mut activity = false;
unsafe {
let worker = &*scheduler.workers[current_core].get();
let head_before = worker.local_head.load(Ordering::Acquire);
worker.dispatch_loop(pool);
if worker.local_head.load(Ordering::Acquire) != head_before {
activity = true;
}
}
let q_len_before =
unsafe { (&*scheduler.workers[current_core].get()).local_queue_len() };
scheduler.poll_mailboxes(current_core);
let q_len_after =
unsafe { (&*scheduler.workers[current_core].get()).local_queue_len() };
if q_len_after > q_len_before {
activity = true;
}
if activity {
idle_count = 0;
continue;
}
idle_count = idle_count.saturating_add(1);
if idle_count < 256 {
core::hint::spin_loop();
continue;
}
if idle_count < 2048 {
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!("yield", options(nostack, preserves_flags));
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
core::hint::spin_loop();
#[cfg(all(feature = "hw-acceleration", target_arch = "riscv64"))]
unsafe {
core::arch::asm!("pause", options(nostack, preserves_flags));
}
#[cfg(not(any(
target_arch = "aarch64",
target_arch = "x86",
target_arch = "x86_64",
all(feature = "hw-acceleration", target_arch = "riscv64")
)))]
for _ in 0..8 {
core::hint::spin_loop();
}
continue;
}
unsafe {
let worker = &*scheduler.workers[current_core].get();
let signal_before = worker
.event_signal
.load(core::sync::atomic::Ordering::Acquire);
#[cfg(all(feature = "hw-acceleration", target_arch = "aarch64"))]
core::arch::asm!("wfe", options(nostack, preserves_flags));
#[cfg(all(feature = "hw-acceleration", target_arch = "riscv64"))]
core::arch::asm!("pause", options(nostack, preserves_flags));
#[cfg(all(
feature = "hw-acceleration",
any(target_arch = "x86_64", target_arch = "x86")
))]
{
let sig_ptr = &raw const worker.event_signal as *mut core::ffi::c_void;
let control = 1u32; let timeout_low = 2_000_000u32;
let timeout_high = 0u32;
core::arch::asm!(
"umonitor {0}",
"cmp {1:e}, {2:e}",
"jne 2f",
"umwait {3:e}",
"2:",
in(reg) sig_ptr,
in(reg) signal_before,
in(reg) worker.event_signal.load(core::sync::atomic::Ordering::Relaxed),
in(reg) control,
inout("eax") timeout_low => _,
inout("edx") timeout_high => _,
options(nostack, preserves_flags)
);
}
#[cfg(any(target_arch = "aarch64", target_arch = "riscv64"))]
core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
scheduler.poll_mailboxes(current_core);
let head = worker
.local_head
.load(core::sync::atomic::Ordering::Acquire);
let tail = worker
.local_tail
.load(core::sync::atomic::Ordering::Acquire);
if head == tail {
crate::utils::futex_wait(&raw const worker.event_signal, signal_before);
}
#[cfg(not(feature = "hw-acceleration"))]
{
if idle_count > 10000 {
std::thread::yield_now();
idle_count = 2048; } else {
core::hint::spin_loop();
}
}
}
if idle_count > 20000 {
idle_count = 10000;
}
}
}
}