use crate::dta_scheduler::TopologyMode;
use crate::memory_management::SafetyLevel;
use core::ffi::c_void;
#[allow(non_camel_case_types)]
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct dtact_handle_t(pub u64);
#[repr(C)]
pub struct dtact_config_t {
pub workers: u32,
pub safety_level: u8,
pub topology_mode: u8,
pub fiber_capacity: u32,
pub stack_size: u32,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct dtact_spawn_options_t {
pub priority: u8,
pub affinity: u8,
pub kind: u8,
pub switcher: u8,
}
#[unsafe(no_mangle)]
pub const extern "C" fn dtact_default_spawn_options() -> dtact_spawn_options_t {
dtact_spawn_options_t {
priority: 1, affinity: 0, kind: 0, switcher: 0, }
}
#[unsafe(no_mangle)]
pub const extern "C" fn dtact_default_config() -> dtact_config_t {
dtact_config_t {
workers: 0, safety_level: 1, topology_mode: 0, fiber_capacity: 0, stack_size: 0, }
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dtact_init(cfg: *const dtact_config_t) -> *mut c_void {
let cfg = unsafe { &*cfg };
let workers = if cfg.workers == 0 {
std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
} else {
cfg.workers as usize
};
let safety = match cfg.safety_level {
0 => SafetyLevel::Safety0,
2 => SafetyLevel::Safety2,
_ => SafetyLevel::Safety1,
};
let topology = match cfg.topology_mode {
1 => TopologyMode::Global,
_ => TopologyMode::P2PMesh,
};
let capacity = if cfg.fiber_capacity == 0 {
4096
} else {
cfg.fiber_capacity
};
let stack_size = if cfg.stack_size == 0 {
512 * 1024
} else {
cfg.stack_size as usize
};
crate::GLOBAL_RUNTIME.get_or_init(|| {
let scheduler = crate::dta_scheduler::DtaScheduler::new(workers, topology);
let pool = crate::memory_management::ContextPool::new(capacity, stack_size, safety, 0)
.expect("DTA-V3 FFI Initialization Failed");
crate::Runtime {
scheduler,
pool,
started: core::sync::atomic::AtomicBool::new(false),
shutdown: core::sync::atomic::AtomicBool::new(false),
}
});
core::ptr::null_mut()
}
#[unsafe(no_mangle)]
pub extern "C" fn dtact_abort() -> ! {
eprintln!("DTA-V3 Critical: Fiber attempted to 'return' instead of yielding. Stack corrupted.");
std::process::abort();
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn dtact_free_arg(arg: *mut c_void) {
if !arg.is_null() {
unsafe {
libc::free(arg);
}
}
}
#[unsafe(no_mangle)]
#[allow(clippy::cast_possible_truncation)]
pub unsafe extern "C" fn dtact_fiber_launch(
func: extern "C" fn(*mut c_void),
arg: *mut c_void,
) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
let ctx_ptr = pool.get_context_ptr(ctx_id);
#[allow(clippy::cast_possible_truncation)]
let current_core = crate::api::topology::current().core_id as usize;
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u32,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = crate::context_switch::switch_context_cross_thread_float;
(*ctx_ptr).closure_ptr = arg.cast::<()>();
(*ctx_ptr).trampoline =
core::mem::transmute::<extern "C" fn(*mut c_void), unsafe extern "C" fn()>(func);
(*ctx_ptr).invoke_closure = |ptr| {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if let Some(ctx) = unsafe { ctx_ptr.as_ref() } {
let f: extern "C" fn(*mut c_void) = unsafe { core::mem::transmute(ctx.trampoline) };
f(ptr.cast::<c_void>());
}
};
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top = (ctx_ptr as usize & !0xF) - 72;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
let stack_top = (ctx_ptr as usize & !0xF) - 80;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top_ptr = stack_top as *mut u64;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
core::ptr::write(stack_top_ptr, dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = crate::api::fiber_entry_point as *const () as u64; #[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[10] = ctx_ptr as u64; (*ctx_ptr).regs.gprs[11] = limit as u64; (*ctx_ptr).regs.gprs[12] = limit as u64; (*ctx_ptr).regs.gprs[13] = !0; }
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = crate::api::fiber_entry_point as *const () as u64; #[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[13] = ctx_ptr as u64; (*ctx_ptr).regs.gprs[14] = limit as u64; (*ctx_ptr).regs.gprs[15] = limit as u64; }
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = crate::api::fiber_entry_point as *const () as u64; }
(*ctx_ptr).cleanup_fn = None;
}
let r#gen = u64::from(unsafe {
(*ctx_ptr)
.generation
.load(core::sync::atomic::Ordering::Acquire)
});
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(
u64::from(ctx_id) | ((current_core as u64) << 32) | ((r#gen & 0x7FFF) << 48) | (1 << 63),
)
}
#[unsafe(no_mangle)]
#[allow(clippy::cast_possible_truncation)]
pub unsafe extern "C" fn dtact_fiber_launch_ext(
func: extern "C" fn(*mut c_void),
arg: *mut c_void,
options: *const dtact_spawn_options_t,
) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
let ctx_ptr = pool.get_context_ptr(ctx_id);
let current_core = crate::api::topology::current().core_id as usize;
let opts = if options.is_null() {
dtact_default_spawn_options()
} else {
unsafe { *options }
};
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u32,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = match opts.switcher {
1 => crate::context_switch::switch_context_cross_thread_no_float,
2 => crate::context_switch::switch_context_same_thread_float,
3 => crate::context_switch::switch_context_same_thread_no_float,
_ => crate::context_switch::switch_context_cross_thread_float,
};
(*ctx_ptr).kind = match opts.kind {
1 => crate::common_types::WorkloadKind::IO,
2 => crate::common_types::WorkloadKind::Memory,
3 => crate::common_types::WorkloadKind::System,
_ => crate::common_types::WorkloadKind::Compute,
};
(*ctx_ptr).adaptive_spin_count = match (*ctx_ptr).kind {
crate::common_types::WorkloadKind::Compute => 1000,
crate::common_types::WorkloadKind::IO => 100,
crate::common_types::WorkloadKind::Memory => 500,
crate::common_types::WorkloadKind::System => 200,
};
(*ctx_ptr).closure_ptr = arg.cast::<()>();
(*ctx_ptr).trampoline =
core::mem::transmute::<extern "C" fn(*mut c_void), unsafe extern "C" fn()>(func);
(*ctx_ptr).invoke_closure = |ptr| {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if let Some(ctx) = unsafe { ctx_ptr.as_ref() } {
let f: extern "C" fn(*mut c_void) = unsafe { core::mem::transmute(ctx.trampoline) };
f(ptr.cast::<c_void>());
}
};
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top = (ctx_ptr as usize & !0xF) - 72;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
let stack_top = (ctx_ptr as usize & !0xF) - 80;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top_ptr = stack_top as *mut u64;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
core::ptr::write(stack_top_ptr, dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64;
(*ctx_ptr).regs.gprs[7] = crate::api::fiber_entry_point as *const () as u64;
#[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[10] = ctx_ptr as u64;
(*ctx_ptr).regs.gprs[11] = limit as u64;
(*ctx_ptr).regs.gprs[12] = limit as u64;
(*ctx_ptr).regs.gprs[13] = !0;
}
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64;
(*ctx_ptr).regs.gprs[11] = crate::api::fiber_entry_point as *const () as u64;
#[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[13] = ctx_ptr as u64;
(*ctx_ptr).regs.gprs[14] = limit as u64;
(*ctx_ptr).regs.gprs[15] = limit as u64;
}
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64;
(*ctx_ptr).regs.gprs[13] = crate::api::fiber_entry_point as *const () as u64;
}
(*ctx_ptr).cleanup_fn = None;
}
let r#gen = u64::from(unsafe {
(*ctx_ptr)
.generation
.load(core::sync::atomic::Ordering::Acquire)
});
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(
u64::from(ctx_id) | ((current_core as u64) << 32) | ((r#gen & 0x7FFF) << 48) | (1 << 63),
)
}
#[unsafe(no_mangle)]
#[allow(clippy::cast_possible_truncation)]
pub unsafe extern "C" fn dtact_fiber_launch_with_cleanup(
func: extern "C" fn(*mut c_void),
arg: *mut c_void,
cleanup: unsafe extern "C" fn(*mut c_void),
) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
let ctx_ptr = pool.get_context_ptr(ctx_id);
#[allow(clippy::cast_possible_truncation)]
let current_core = crate::api::topology::current().core_id as usize;
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u32,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = crate::context_switch::switch_context_cross_thread_float;
(*ctx_ptr).closure_ptr = arg.cast::<()>();
(*ctx_ptr).trampoline =
core::mem::transmute::<extern "C" fn(*mut c_void), unsafe extern "C" fn()>(func);
(*ctx_ptr).cleanup_fn = Some(core::mem::transmute::<
unsafe extern "C" fn(*mut c_void),
unsafe extern "C" fn(*mut ()),
>(cleanup));
(*ctx_ptr).invoke_closure = |ptr| {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if let Some(ctx) = unsafe { ctx_ptr.as_ref() } {
let f: extern "C" fn(*mut c_void) = unsafe {
core::mem::transmute::<unsafe extern "C" fn(), extern "C" fn(*mut c_void)>(
ctx.trampoline,
)
};
f(ptr.cast::<c_void>());
}
};
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top = (ctx_ptr as usize & !0xF) - 72;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
let stack_top = (ctx_ptr as usize & !0xF) - 80;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top_ptr = stack_top as *mut u64;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
core::ptr::write(stack_top_ptr, dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = crate::api::fiber_entry_point as *const () as u64; #[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[10] = ctx_ptr as u64; (*ctx_ptr).regs.gprs[11] = limit as u64; (*ctx_ptr).regs.gprs[12] = limit as u64; (*ctx_ptr).regs.gprs[13] = !0; }
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = crate::api::fiber_entry_point as *const () as u64; #[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[13] = ctx_ptr as u64; (*ctx_ptr).regs.gprs[14] = limit as u64; (*ctx_ptr).regs.gprs[15] = limit as u64; }
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = crate::api::fiber_entry_point as *const () as u64; }
}
let r#gen = u64::from(unsafe {
(*ctx_ptr)
.generation
.load(core::sync::atomic::Ordering::Acquire)
});
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(
u64::from(ctx_id) | ((current_core as u64) << 32) | ((r#gen & 0x7FFF) << 48) | (1 << 63),
)
}
#[unsafe(no_mangle)]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::too_many_lines)]
pub unsafe extern "C" fn dtact_fiber_launch_with_cleanup_ext(
func: extern "C" fn(*mut c_void),
arg: *mut c_void,
cleanup: unsafe extern "C" fn(*mut c_void),
options: *const dtact_spawn_options_t,
) -> dtact_handle_t {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let pool = &runtime.pool;
let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
let ctx_ptr = pool.get_context_ptr(ctx_id);
let current_core = crate::api::topology::current().core_id as usize;
let opts = if options.is_null() {
dtact_default_spawn_options()
} else {
unsafe { *options }
};
unsafe {
(*ctx_ptr).state.store(
crate::memory_management::FiberStatus::Running as u32,
core::sync::atomic::Ordering::Release,
);
(*ctx_ptr).origin_core = current_core as u16;
(*ctx_ptr).fiber_index = ctx_id;
(*ctx_ptr).switch_fn = match opts.switcher {
1 => crate::context_switch::switch_context_cross_thread_no_float,
2 => crate::context_switch::switch_context_same_thread_float,
3 => crate::context_switch::switch_context_same_thread_no_float,
_ => crate::context_switch::switch_context_cross_thread_float,
};
(*ctx_ptr).kind = match opts.kind {
1 => crate::common_types::WorkloadKind::IO,
2 => crate::common_types::WorkloadKind::Memory,
3 => crate::common_types::WorkloadKind::System,
_ => crate::common_types::WorkloadKind::Compute,
};
(*ctx_ptr).adaptive_spin_count = match (*ctx_ptr).kind {
crate::common_types::WorkloadKind::Compute => 1000,
crate::common_types::WorkloadKind::IO => 100,
crate::common_types::WorkloadKind::Memory => 500,
crate::common_types::WorkloadKind::System => 200,
};
(*ctx_ptr).closure_ptr = arg.cast::<()>();
(*ctx_ptr).trampoline =
core::mem::transmute::<extern "C" fn(*mut c_void), unsafe extern "C" fn()>(func);
(*ctx_ptr).cleanup_fn = Some(core::mem::transmute::<
unsafe extern "C" fn(*mut c_void),
unsafe extern "C" fn(*mut ()),
>(cleanup));
(*ctx_ptr).invoke_closure = |ptr| {
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if let Some(ctx) = unsafe { ctx_ptr.as_ref() } {
let f: extern "C" fn(*mut c_void) = unsafe {
core::mem::transmute::<unsafe extern "C" fn(), extern "C" fn(*mut c_void)>(
ctx.trampoline,
)
};
f(ptr.cast::<c_void>());
}
};
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top = (ctx_ptr as usize & !0xF) - 72;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
let stack_top = (ctx_ptr as usize & !0xF) - 80;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
let stack_top_ptr = stack_top as *mut u64;
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
core::ptr::write(stack_top_ptr, dtact_abort as *const () as u64);
let stack_top = stack_top as *mut u8;
#[cfg(target_arch = "x86_64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64;
(*ctx_ptr).regs.gprs[7] = crate::api::fiber_entry_point as *const () as u64;
#[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[10] = ctx_ptr as u64;
(*ctx_ptr).regs.gprs[11] = limit as u64;
(*ctx_ptr).regs.gprs[12] = limit as u64;
(*ctx_ptr).regs.gprs[13] = !0;
}
}
#[cfg(target_arch = "aarch64")]
{
(*ctx_ptr).regs.gprs[12] = stack_top as u64;
(*ctx_ptr).regs.gprs[11] = crate::api::fiber_entry_point as *const () as u64;
#[cfg(windows)]
{
let limit = (ctx_ptr as usize).saturating_sub(pool.slot_size);
(*ctx_ptr).regs.gprs[13] = ctx_ptr as u64;
(*ctx_ptr).regs.gprs[14] = limit as u64;
(*ctx_ptr).regs.gprs[15] = limit as u64;
}
}
#[cfg(target_arch = "riscv64")]
{
(*ctx_ptr).regs.gprs[0] = stack_top as u64;
(*ctx_ptr).regs.gprs[13] = crate::api::fiber_entry_point as *const () as u64;
}
}
let r#gen = u64::from(unsafe {
(*ctx_ptr)
.generation
.load(core::sync::atomic::Ordering::Acquire)
});
crate::wake_fiber(current_core, ctx_id);
dtact_handle_t(
u64::from(ctx_id) | ((current_core as u64) << 32) | ((r#gen & 0x7FFF) << 48) | (1 << 63),
)
}
#[unsafe(no_mangle)]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::too_many_lines)]
pub extern "C" fn dtact_await(handle: dtact_handle_t) {
let handle_val = handle.0 & !(1 << 63); let target_ctx_id = (handle_val & 0xFFFF_FFFF) as u32;
let handle_gen = ((handle_val >> 48) & 0x7FFF) as u16; let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Runtime not initialized");
let pool = &runtime.pool;
let target_ctx = pool.get_context_ptr(target_ctx_id);
let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
if ctx_ptr.is_null() {
let mut spins = 0u32;
loop {
let (_current_gen, status) = unsafe {
let g1 = (*target_ctx)
.generation
.load(core::sync::atomic::Ordering::Acquire) as u16;
#[cfg(any(target_arch = "aarch64", target_arch = "riscv64"))]
core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
let status = (*target_ctx)
.state
.load(core::sync::atomic::Ordering::Acquire);
#[cfg(any(target_arch = "aarch64", target_arch = "riscv64"))]
core::sync::atomic::fence(core::sync::atomic::Ordering::SeqCst);
let g2 = (*target_ctx)
.generation
.load(core::sync::atomic::Ordering::Acquire) as u16;
if (g1 & 0x7FFF) != handle_gen || (g2 & 0x7FFF) != handle_gen {
break;
}
(g1 & 0x7FFF, status)
};
if status == crate::memory_management::FiberStatus::Finished as u32 {
break;
}
if spins < 1000 {
core::hint::spin_loop();
spins += 1;
} else {
std::thread::yield_now();
spins = 500; }
}
return;
}
loop {
unsafe {
(*ctx_ptr).state.swap(
crate::memory_management::FiberStatus::Running as u32,
core::sync::atomic::Ordering::AcqRel,
);
}
let (current_gen, status) = unsafe {
(
((*target_ctx)
.generation
.load(core::sync::atomic::Ordering::Acquire) as u16)
& 0x7FFF,
(*target_ctx)
.state
.load(core::sync::atomic::Ordering::Acquire),
)
};
if current_gen != handle_gen
|| status == crate::memory_management::FiberStatus::Finished as u32
{
unsafe {
(*target_ctx)
.waiter_handle
.store(0, core::sync::atomic::Ordering::Relaxed);
}
break;
}
let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
let current_ctx_id = unsafe { u64::from((*ctx_ptr).fiber_index) };
let my_handle = current_ctx_id | ((current_worker as u64) << 32) | (1 << 63);
unsafe {
(*target_ctx)
.waiter_handle
.swap(my_handle, core::sync::atomic::Ordering::SeqCst);
}
let (current_gen_post, status_post) = unsafe {
(
((*target_ctx)
.generation
.load(core::sync::atomic::Ordering::Acquire) as u16)
& 0x7FFF,
(*target_ctx)
.state
.load(core::sync::atomic::Ordering::Acquire),
)
};
if current_gen_post != handle_gen
|| status_post == crate::memory_management::FiberStatus::Finished as u32
{
unsafe {
(*target_ctx)
.waiter_handle
.store(0, core::sync::atomic::Ordering::Relaxed);
}
break;
}
unsafe {
let ctx = &mut *ctx_ptr;
if ctx
.state
.compare_exchange(
crate::memory_management::FiberStatus::Running as u32,
crate::memory_management::FiberStatus::Suspending as u32,
core::sync::atomic::Ordering::Release,
core::sync::atomic::Ordering::Acquire,
)
.is_ok()
{
(ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
}
}
}
}
#[unsafe(no_mangle)]
pub extern "C" fn dtact_run(_rt: *mut c_void) {
let runtime = crate::GLOBAL_RUNTIME
.get()
.expect("Dtact Runtime not initialized");
let scheduler = &runtime.scheduler;
let workers_count = scheduler.workers.len();
let mut handles = alloc::vec::Vec::with_capacity(workers_count);
for i in 0..workers_count {
let handle = std::thread::spawn(move || {
if let Some(runtime) = crate::GLOBAL_RUNTIME.get() {
crate::dta_scheduler::DtaScheduler::run_worker_static(
&runtime.scheduler,
i,
&runtime.pool,
&runtime.shutdown,
);
}
});
handles.push(handle);
}
for h in handles {
let _ = h.join();
}
}
#[unsafe(no_mangle)]
pub extern "C" fn dtact_shutdown() {
if let Some(runtime) = crate::GLOBAL_RUNTIME.get() {
runtime
.shutdown
.store(true, core::sync::atomic::Ordering::SeqCst);
for i in 0..runtime.scheduler.workers.len() {
unsafe {
let worker = &*runtime.scheduler.workers[i].get();
worker
.event_signal
.fetch_add(1, core::sync::atomic::Ordering::SeqCst);
crate::utils::futex_wake(
(&raw const worker.event_signal).cast::<core::sync::atomic::AtomicU32>(),
);
}
}
}
}