pub use crate::c_ffi::dtact_handle_t;
pub use crate::common_types::{TopologyMode, WorkloadKind};
pub use crate::memory_management::{ContextPool, FiberContext, FiberStatus, SafetyLevel};
use core::future::Future;
use core::pin::Pin;
pub use topology::Affinity;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Priority {
Low,
Normal,
High,
Critical,
}
pub trait ContextSwitcher: Send + Sync + 'static {
const SWITCH_FN: unsafe extern "C" fn(
*mut crate::memory_management::Registers,
*const crate::memory_management::Registers,
);
}
pub struct CrossThreadFloat;
impl ContextSwitcher for CrossThreadFloat {
const SWITCH_FN: unsafe extern "C" fn(
*mut crate::memory_management::Registers,
*const crate::memory_management::Registers,
) = crate::context_switch::switch_context_cross_thread_float;
}
pub struct CrossThreadNoFloat;
impl ContextSwitcher for CrossThreadNoFloat {
const SWITCH_FN: unsafe extern "C" fn(
*mut crate::memory_management::Registers,
*const crate::memory_management::Registers,
) = crate::context_switch::switch_context_cross_thread_no_float;
}
pub struct SameThreadFloat;
impl ContextSwitcher for SameThreadFloat {
const SWITCH_FN: unsafe extern "C" fn(
*mut crate::memory_management::Registers,
*const crate::memory_management::Registers,
) = crate::context_switch::switch_context_same_thread_float;
}
pub struct SameThreadNoFloat;
impl ContextSwitcher for SameThreadNoFloat {
const SWITCH_FN: unsafe extern "C" fn(
*mut crate::memory_management::Registers,
*const crate::memory_management::Registers,
) = crate::context_switch::switch_context_same_thread_no_float;
}
pub struct SpawnBuilder<S: ContextSwitcher = CrossThreadFloat> {
name: Option<&'static str>,
affinity: topology::Affinity,
priority: Priority,
kind: WorkloadKind,
mode: TopologyMode,
safety: crate::memory_management::SafetyLevel,
_marker: core::marker::PhantomData<S>,
}
impl<S: ContextSwitcher> Default for SpawnBuilder<S> {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<S: ContextSwitcher> SpawnBuilder<S> {
#[inline(always)]
#[must_use]
pub const fn new() -> Self {
Self {
name: None,
affinity: topology::Affinity::SameCore,
priority: Priority::Normal,
kind: WorkloadKind::Compute,
mode: TopologyMode::P2PMesh,
safety: crate::memory_management::SafetyLevel::Safety0,
_marker: core::marker::PhantomData,
}
}
#[inline(always)]
#[must_use]
pub const fn kind(mut self, kind: WorkloadKind) -> Self {
self.kind = kind;
self
}
#[inline(always)]
#[must_use]
pub const fn topology_mode(mut self, mode: TopologyMode) -> Self {
self.mode = mode;
self
}
#[inline(always)]
#[must_use]
pub const fn safety(mut self, safety: crate::memory_management::SafetyLevel) -> Self {
self.safety = safety;
self
}
#[inline(always)]
#[must_use]
pub const fn name(mut self, name: &'static str) -> Self {
self.name = Some(name);
self
}
#[inline(always)]
#[must_use]
pub const fn affinity(mut self, affinity: topology::Affinity) -> Self {
self.affinity = affinity;
self
}
#[inline(always)]
#[must_use]
pub const fn priority(mut self, priority: Priority) -> Self {
self.priority = priority;
self
}
#[inline(always)]
#[must_use]
pub const fn switcher<NewS: ContextSwitcher>(self) -> SpawnBuilder<NewS> {
SpawnBuilder {
name: self.name,
affinity: self.affinity,
priority: self.priority,
kind: self.kind,
mode: self.mode,
safety: self.safety,
_marker: core::marker::PhantomData,
}
}
#[inline(always)]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::useless_let_if_seq)]
#[allow(clippy::too_many_lines)]
pub fn spawn<F: Future + Send + 'static>(self, fut: F) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let mut fixed_spins: u32 = 0;
let ctx_id = 'alloc: loop {
if let Some(id) = pool.alloc_context() {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if !ctx_ptr.is_null() {
unsafe {
let ctx = &mut *ctx_ptr;
ctx.adaptive_spin_count = (ctx.adaptive_spin_count + 1).min(2000);
ctx.spin_failure_count = ctx.spin_failure_count.saturating_sub(1);
}
}
break 'alloc id;
}
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if ctx_ptr.is_null() {
if fixed_spins < 2000 {
core::hint::spin_loop();
fixed_spins += 1;
if fixed_spins.trailing_zeros() >= 3
&& let Some(id) = pool.alloc_context()
{
break 'alloc id;
}
} else {
std::thread::yield_now();
fixed_spins = 0; }
} else {
unsafe {
let ctx = &mut *ctx_ptr;
let current_spin = ctx.adaptive_spin_count;
let failure_count = ctx.spin_failure_count;
if failure_count < 20 {
for i in 0..current_spin {
core::hint::spin_loop();
if i.trailing_zeros() >= 3
&& let Some(id) = pool.alloc_context()
{
ctx.adaptive_spin_count = (current_spin + 2).min(2000);
ctx.spin_failure_count = failure_count.saturating_sub(1);
break 'alloc id;
}
}
}
ctx.spin_failure_count = failure_count.saturating_add(1);
ctx.adaptive_spin_count = current_spin.saturating_sub(100).max(200);
ctx.state.store(
crate::memory_management::FiberStatus::Notified as u8,
core::sync::atomic::Ordering::Release,
);
(ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
}
}
};
let ctx_ptr = pool.get_context_ptr(ctx_id);
let current_core = crate::future_bridge::CURRENT_WORKER_ID.with(|c| {
let id = c.get();
if id < runtime.scheduler.workers.len() {
id
} else {
topology::current().core_id as usize % runtime.scheduler.workers.len()
}
});
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u8,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).kind = self.kind;
(*ctx_ptr).mode = self.mode;
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = S::SWITCH_FN;
(*ctx_ptr).adaptive_spin_count = match self.kind {
WorkloadKind::Compute => 1000,
WorkloadKind::IO => 100,
WorkloadKind::Memory => 500,
WorkloadKind::System => 200,
};
let align = core::mem::align_of::<F>();
let fut_size = core::mem::size_of::<F>();
let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
let buffer_end = buffer_start + 8192;
let aligned_fut_addr = (buffer_end - fut_size) & !(align - 1);
let stack_limit: usize;
if aligned_fut_addr < buffer_start || (aligned_fut_addr + fut_size) > buffer_end {
crate::HEAP_ESCAPED_SPAWNS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
#[cfg(debug_assertions)]
{
static WARNED: core::sync::atomic::AtomicBool =
core::sync::atomic::AtomicBool::new(false);
if !WARNED.swap(true, core::sync::atomic::Ordering::Relaxed) {
eprintln!(
"DTA-V3 WARNING: Future exceeds or misaligns 8KB zero-copy buffer. Switching to heap-allocation mode."
);
}
}
let boxed = Box::new(fut);
let fut_ptr = Box::into_raw(boxed);
(*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
(*ctx_ptr).invoke_closure = |ptr| unsafe {
let mut f = Box::from_raw(ptr.cast::<F>());
let f_pinned = Pin::new_unchecked(&mut *f);
crate::future_bridge::wait_pinned(f_pinned);
};
(*ctx_ptr).cleanup_fn = None;
stack_limit = buffer_end;
} else {
let fut_ptr = aligned_fut_addr as *mut F;
core::ptr::write(fut_ptr, fut);
(*ctx_ptr).invoke_closure = |ptr| {
let f_ptr = ptr.cast::<F>();
unsafe {
let f_pinned = Pin::new_unchecked(&mut *f_ptr);
crate::future_bridge::wait_pinned(f_pinned);
core::ptr::drop_in_place(f_ptr);
}
};
(*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
stack_limit = aligned_fut_addr;
}
let stack_top = (stack_limit & !0xF) - 8;
let stack_top_ptr = stack_top as *mut u64;
core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = fiber_entry_point as *const () as u64; #[cfg(windows)]
{
(*ctx_ptr).regs.gprs[10] = stack_limit as u64; (*ctx_ptr).regs.gprs[11] = buffer_start as u64; (*ctx_ptr).regs.gprs[12] = buffer_start as u64; (*ctx_ptr).regs.gprs[13] = !0; }
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = fiber_entry_point as u64; #[cfg(windows)]
{
(*ctx_ptr).regs.gprs[13] = stack_limit as u64; (*ctx_ptr).regs.gprs[14] = buffer_start as u64; (*ctx_ptr).regs.gprs[15] = buffer_start as u64; }
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = fiber_entry_point as u64; }
}
let r#gen = u64::from(unsafe {
(*ctx_ptr)
.generation
.load(core::sync::atomic::Ordering::Acquire)
});
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(
u64::from(ctx_id)
| ((current_core as u64) << 32)
| ((r#gen & 0xFFFF) << 48)
| (1 << 63),
)
}
}
pub(crate) unsafe extern "C" fn fiber_entry_point() {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if ctx_ptr.is_null() {
return;
}
let ctx = unsafe { &mut *ctx_ptr };
let invoke = ctx.invoke_closure;
let arg = ctx.closure_ptr;
let _ = std::panic::catch_unwind(core::panic::AssertUnwindSafe(move || {
unsafe { invoke(arg) };
}));
if let Some(cleanup) = ctx.cleanup_fn.take() {
unsafe { cleanup(ctx.closure_ptr) };
}
ctx.state.store(
crate::memory_management::FiberStatus::Finished as u8,
core::sync::atomic::Ordering::Release,
);
let waiter = ctx
.waiter_handle
.swap(0, core::sync::atomic::Ordering::AcqRel);
if waiter != 0 {
let waiter = waiter & !(1 << 63); let waiter_ctx_id = (waiter & 0xFFFF_FFFF) as u32;
let target_worker = (waiter >> 32) as usize;
if let Some(runtime) = crate::GLOBAL_RUNTIME.get() {
let num_workers = runtime.scheduler.workers.len();
let target_worker = target_worker % num_workers;
let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
if current_worker == target_worker {
unsafe {
let worker = &mut *runtime.scheduler.workers[target_worker].get();
worker.push_local(waiter_ctx_id);
}
} else if current_worker < num_workers {
let mut chunk = crate::dta_scheduler::TaskChunk::default();
chunk.tasks[0] = waiter_ctx_id;
chunk.count = 1;
let _ = runtime.scheduler.mailboxes[current_worker][target_worker].push(chunk);
} else {
let _ = runtime.scheduler.enqueue_task(
target_worker,
u64::from(waiter_ctx_id),
waiter_ctx_id,
);
}
}
}
unsafe { crate::utils::futex_wake(&raw const ctx.state) };
unsafe {
(ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
}
}
pub static TOPOLOGY_EPOCH: core::sync::atomic::AtomicU64 = core::sync::atomic::AtomicU64::new(0);
pub mod topology {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Affinity {
SameCore,
SameCCX,
SameNUMA,
Any,
}
#[inline(always)]
#[must_use]
pub fn current_core() -> u16 {
current().core_id
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CpuLevel {
pub core_id: u16,
pub ccx_id: u16,
pub numa_id: u16,
}
#[inline(always)]
pub fn current() -> CpuLevel {
thread_local! {
static CACHED: core::cell::Cell<(CpuLevel, u64)> = const {
core::cell::Cell::new((CpuLevel { core_id: 0, ccx_id: 0, numa_id: 0 }, 0))
};
}
let (mut cpu, mut last_refresh) = CACHED.with(std::cell::Cell::get);
let (now, cpu_id) = crate::utils::get_tick_with_cpu();
if now.wrapping_sub(last_refresh) > 100_000 || u32::from(cpu.core_id) != cpu_id {
let next_cpu = current_raw();
if next_cpu != cpu {
crate::TOPOLOGY_EPOCH.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
cpu = next_cpu;
}
last_refresh = now;
CACHED.with(|c| c.set((cpu, last_refresh)));
}
cpu
}
#[inline(always)]
#[must_use]
pub fn current_raw() -> CpuLevel {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
{
let (x2apic_id, core_shift, package_shift): (u32, u32, u32);
unsafe {
let (mut eax, mut edx_v): (u32, u32);
core::arch::asm!(
"push rbx",
"cpuid",
"mov {ebx_out:e}, ebx",
"pop rbx",
ebx_out = out(reg) _,
inout("eax") 0x0B => eax,
inout("ecx") 0 => _,
out("edx") edx_v,
);
core_shift = eax;
x2apic_id = edx_v;
let eax_p: u32;
core::arch::asm!(
"push rbx",
"cpuid",
"mov {ebx_out:e}, ebx",
"pop rbx",
ebx_out = out(reg) _,
inout("eax") 0x0B => eax_p,
inout("ecx") 1 => _,
out("edx") _,
);
package_shift = eax_p;
}
let core_id = x2apic_id & ((1 << core_shift) - 1);
let ccx_id = (x2apic_id >> core_shift) & ((1 << (package_shift - core_shift)) - 1);
let numa_id = x2apic_id >> package_shift;
CpuLevel {
core_id: (core_id & 0xFFFF) as u16,
ccx_id: (ccx_id & 0xFFFF) as u16,
numa_id: (numa_id & 0xFFFF) as u16,
}
}
#[cfg(target_arch = "aarch64")]
{
let mut mpidr: u64;
unsafe {
core::arch::asm!("mrs {}, mpidr_el1", out(reg) mpidr, options(nomem, nostack, preserves_flags));
}
return CpuLevel {
core_id: (mpidr & 0xFF) as u16,
ccx_id: ((mpidr >> 8) & 0xFF) as u16,
numa_id: ((mpidr >> 16) & 0xFF) as u16,
};
}
#[cfg(target_arch = "riscv64")]
{
let mut hart_id: u64;
unsafe {
core::arch::asm!("csrr {}, mhartid", out(reg) hart_id, options(nomem, nostack, preserves_flags));
}
return CpuLevel {
core_id: (hart_id & 0xFFFF) as u16,
ccx_id: (hart_id >> 16) as u16,
numa_id: 0,
};
}
#[cfg(not(any(
target_arch = "x86",
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "riscv64"
)))]
{
CpuLevel {
core_id: 0,
ccx_id: 0,
numa_id: 0,
}
}
}
}
#[inline(always)]
pub fn spawn<F: Future + Send + 'static>(fut: F) -> dtact_handle_t {
SpawnBuilder::<CrossThreadFloat>::new().spawn(fut)
}
#[inline(always)]
#[must_use]
pub const fn spawn_with() -> SpawnBuilder<CrossThreadFloat> {
SpawnBuilder::new()
}
#[doc(hidden)]
pub mod spawn {
use super::{CrossThreadFloat, SpawnBuilder};
#[inline(always)]
#[must_use]
#[doc(hidden)]
pub const fn builder() -> SpawnBuilder<CrossThreadFloat> {
SpawnBuilder::new()
}
}
pub mod fiber {
use super::{dtact_handle_t, topology};
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn spawn_with_stack<F: FnOnce() + Send + 'static>(
_stack_size_str: &str,
f: F,
) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
let ctx_ptr = pool.get_context_ptr(ctx_id);
#[allow(clippy::cast_possible_truncation)]
let current_core = topology::current().core_id as usize;
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u8,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = crate::context_switch::switch_context_same_thread_no_float;
let f_ptr = (*ctx_ptr).read_buffer_ptr.cast::<F>();
core::ptr::write(f_ptr, f);
(*ctx_ptr).invoke_closure = |ptr| {
let f = core::ptr::read(ptr.cast::<F>());
f();
};
(*ctx_ptr).closure_ptr = f_ptr.cast::<()>();
let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
let stack_top = (buffer_start & !0xF) - 72;
let stack_top_ptr = stack_top as *mut u64;
core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = super::fiber_entry_point as *const () as u64; #[cfg(windows)]
{
let limit = buffer_start.saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[10] = buffer_start as u64; (*ctx_ptr).regs.gprs[11] = limit as u64; (*ctx_ptr).regs.gprs[12] = limit as u64; (*ctx_ptr).regs.gprs[13] = !0; }
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = super::fiber_entry_point as u64; #[cfg(windows)]
{
let limit = buffer_start.saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[13] = buffer_start as u64; (*ctx_ptr).regs.gprs[14] = limit as u64; (*ctx_ptr).regs.gprs[15] = limit as u64; }
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = super::fiber_entry_point as u64; }
}
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(u64::from(ctx_id) | ((current_core as u64) << 32))
}
#[inline(always)]
pub fn yield_to(handle: dtact_handle_t) {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if ctx_ptr.is_null() {
return;
}
let target_ctx_id = (handle.0 & 0xFFFF_FFFF) as u32;
let target_core_id = ((handle.0 >> 32) & 0xFFFF) as usize;
crate::wake_fiber(target_core_id, target_ctx_id);
unsafe {
let ctx = &mut *ctx_ptr;
ctx.state.store(
crate::memory_management::FiberStatus::Yielded as u8,
core::sync::atomic::Ordering::Release,
);
(ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
}
}
}
#[cfg(feature = "hw-acceleration")]
pub mod hw {
#[inline(always)]
pub fn cldemote<T>(ptr: *const T) {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
unsafe {
core::arch::asm!("cldemote [{}]", in(reg) ptr);
}
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!("dc cvac, {}", in(reg) ptr);
}
#[cfg(target_arch = "riscv64")]
unsafe {
core::arch::asm!("cbo.clean 0({0})", in(reg) ptr);
}
}
#[inline(always)]
pub fn uintr_signal(target_cpu: usize) {
#[cfg(target_arch = "x86_64")]
unsafe {
core::arch::asm!(
"mov rax, {}",
".byte 0xf3, 0x0f, 0xc7, 0xf0",
in(reg) target_cpu as u64,
out("rax") _,
options(nostack, preserves_flags),
);
}
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!("sev", options(nostack, preserves_flags));
}
#[cfg(target_arch = "riscv64")]
unsafe {
core::arch::asm!("csrw uipi, {0}", in(reg) target_cpu);
}
}
}
#[inline(always)]
pub async fn yield_now() {
struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
#[inline(always)]
fn poll(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if self.0 {
core::task::Poll::Ready(())
} else {
self.0 = true;
cx.waker().wake_by_ref();
core::task::Poll::Pending
}
}
}
YieldNow(false).await;
}
#[inline(always)]
pub async fn yield_to(handle: dtact_handle_t) {
let handle_val = handle.0 & !(1 << 63); let target_ctx_id = (handle_val & 0xFFFF_FFFF) as u32;
let target_core_id = ((handle_val >> 32) & 0xFFFF) as usize;
crate::wake_fiber(target_core_id, target_ctx_id);
yield_now().await;
}
pub mod config {
use core::sync::atomic::Ordering;
#[inline(always)]
pub fn set_deflection_threshold(core_id: usize, threshold: u8) {
if let Some(runtime) = crate::GLOBAL_RUNTIME.get()
&& core_id < runtime.scheduler.workers.len()
{
unsafe {
let worker = &*runtime.scheduler.workers[core_id].get();
worker
.deflection_threshold
.store(threshold, Ordering::Release);
}
}
}
}
pub trait DtactWaitExt {
type Output;
fn wait(self) -> Self::Output;
}
impl<F: Future> DtactWaitExt for F {
type Output = F::Output;
#[inline(always)]
fn wait(self) -> Self::Output {
crate::future_bridge::wait(self)
}
}