dtact/lib.rs
1//! # Dtact-V3: Distributed Task-Aware Coroutine Toolkit
2//!
3//! Dtact is a high-performance, low-latency asynchronous runtime designed for systems-level
4//! programming across heterogeneous architectures (`x86_64`, `AArch64`, `RISC-V`).
5//!
6//! ## Core Architecture
7//! 1. **Lock-Free Arena**: A page-aligned memory pool for fiber contexts, providing O(1) allocation
8//! and hardware-level guard pages for memory safety.
9//! 2. **P2P Scheduler Mesh**: A distributed work-stealing/deflection scheduler that minimizes L3
10//! cache thrashing and maximizes NUMA-local execution.
11//! 3. **Zero-Copy Migration**: Leveraging self-referential futures and direct stack-top injection
12//! to move running tasks across cores without heap allocation.
13//!
14//! Dtact provides tiered safety levels (0-2) allowing developers to trade off between raw
15//! performance and hardware-enforced isolation (e.g., guard pages and SEH registration).
16
17// =========================================================================
18// RUST LINT CONFIGURATION: dtact
19// =========================================================================
20
21// -------------------------------------------------------------------------
22// LEVEL 1: CRITICAL ERRORS (Deny)
23// -------------------------------------------------------------------------
24#![deny(
25 unreachable_code,
26 improper_ctypes_definitions,
27 future_incompatible,
28 nonstandard_style,
29 rust_2018_idioms,
30 clippy::perf,
31 clippy::correctness,
32 clippy::suspicious,
33 clippy::unwrap_used,
34 clippy::expect_used,
35 clippy::indexing_slicing,
36 clippy::arithmetic_side_effects,
37 clippy::missing_safety_doc,
38 clippy::same_item_push,
39 clippy::implicit_clone,
40 clippy::all,
41 clippy::pedantic,
42 missing_docs,
43 clippy::nursery,
44 clippy::single_call_fn
45)]
46// -------------------------------------------------------------------------
47// LEVEL 2: STYLE WARNINGS (Warn)
48// -------------------------------------------------------------------------
49#![warn(
50 dead_code,
51 warnings,
52 clippy::dbg_macro,
53 clippy::todo,
54 clippy::cast_possible_truncation,
55 clippy::cast_sign_loss,
56 clippy::cast_possible_wrap,
57 clippy::unnecessary_safety_comment
58)]
59// -------------------------------------------------------------------------
60// LEVEL 3: ALLOW/IGNORABLE (Allow)
61// -------------------------------------------------------------------------
62#![allow(
63 unsafe_code,
64 unused_unsafe,
65 private_interfaces,
66 clippy::restriction,
67 clippy::inline_always,
68 unused_doc_comments,
69 clippy::empty_line_after_doc_comments,
70 clippy::missing_const_for_thread_local
71)]
72#![crate_name = "dtact"]
73
74extern crate alloc;
75
76/// Set the deflection threshold for the DTA-V3 Scheduler.
77pub use crate::api::config::set_deflection_threshold;
78/// Spawn a fiber with a custom stack size.
79pub use crate::api::fiber::spawn_with_stack;
80/// Yield execution to another fiber.
81pub use crate::api::fiber::yield_to as yield_to_sync;
82/// Hardware-level demotion API.
83#[cfg(feature = "hw-acceleration")]
84pub use crate::api::hw::cldemote;
85/// Hardware-level interrupt signaling API.
86#[cfg(feature = "hw-acceleration")]
87pub use crate::api::hw::uintr_signal as uintr;
88/// Spawn a fiber.
89pub use crate::api::spawn;
90/// Yield execution to the scheduler.
91pub use crate::api::yield_now;
92/// Yield execution to another fiber.
93#[doc(hidden)]
94pub use crate::api::yield_to;
95/// Yield execution to another fiber.
96pub use crate::api::yield_to as yield_to_async;
97/// Wait for a fiber to complete.
98pub use crate::c_ffi::dtact_await;
99/// Handle for C-compatible FFI.
100pub use crate::c_ffi::dtact_handle_t;
101/// Wait for a fiber to complete.
102#[doc(hidden)]
103pub use crate::future_bridge::wait;
104/// Wait for a fiber to complete.
105pub use crate::future_bridge::wait as dtact_wait;
106/// Attribute macro for initializing the Dtact runtime.
107pub use dtact_macros::dtact_init;
108/// Attribute macro for exporting an async function to C.
109pub use dtact_macros::export_async;
110/// Attribute macro for exporting a fiber to C.
111pub use dtact_macros::export_fiber;
112/// Attribute macro for defining a Dtact task.
113pub use dtact_macros::task;
114
115/// Public user-facing API for spawning and managing fibers.
116#[doc(hidden)]
117pub mod api;
118/// C-compatible FFI boundary for cross-language integration.
119#[doc(hidden)]
120pub mod c_ffi;
121/// Common types used across the Dtact runtime.
122#[doc(hidden)]
123pub mod common_types;
124/// Low-level assembly-based context switching primitives.
125#[doc(hidden)]
126pub mod context_switch;
127/// Distributed P2P Mesh scheduler implementation.
128#[doc(hidden)]
129pub mod dta_scheduler;
130/// Bridge for polling futures within a `FiberContext`.
131#[doc(hidden)]
132pub mod future_bridge;
133/// Lock-free arena and OS-level memory management.
134#[doc(hidden)]
135pub mod memory_management;
136/// Timing, topology, and OS-specific primitives.
137#[doc(hidden)]
138pub mod utils;
139
140pub use api::*;
141
142/// DTA-V3 Runtime Environment.
143///
144/// Consolidates the distributed scheduler and the memory pool into a single
145/// unit to ensure architectural consistency across all worker threads.
146#[doc(hidden)]
147pub struct Runtime {
148 /// The distributed P2P work-deflection scheduler.
149 pub scheduler: dta_scheduler::DtaScheduler,
150 /// The lock-free arena for managing fiber stacks and contexts.
151 pub pool: memory_management::ContextPool,
152 /// Flag indicating if the worker threads have been started.
153 pub started: core::sync::atomic::AtomicBool,
154 /// Cooperative shutdown signal for worker threads.
155 pub shutdown: core::sync::atomic::AtomicBool,
156}
157
158impl Runtime {
159 /// Spawns the OS worker threads for the scheduler.
160 ///
161 /// # Panics
162 ///
163 /// Panics if the system fails to spawn a new thread. This can occur if
164 /// the operating system limits on the number of threads have been reached.
165 pub fn start(&'static self) {
166 if self
167 .started
168 .swap(true, core::sync::atomic::Ordering::SeqCst)
169 {
170 return;
171 }
172
173 let workers_count = self.scheduler.workers.len();
174
175 for i in 0..workers_count {
176 // Each closure must capture its own copy of these values.
177 let sched: &'static dta_scheduler::DtaScheduler = &self.scheduler;
178 let pool: &'static memory_management::ContextPool = &self.pool;
179 let shutdown: &'static core::sync::atomic::AtomicBool = &self.shutdown;
180 let my_id = i;
181
182 std::thread::Builder::new()
183 .name(format!("dtact-worker-{my_id}"))
184 .spawn(move || {
185 crate::dta_scheduler::DtaScheduler::run_worker_static(
186 sched, my_id, pool, shutdown,
187 );
188 })
189 .expect("Failed to spawn Dtact worker thread");
190 }
191 }
192}
193
194/// Global Singleton for the Runtime Environment.
195///
196/// This is initialized exactly once per process via `dtact_init` or
197/// implicit autostart triggers in the proc-macro layer.
198#[doc(hidden)]
199pub static GLOBAL_RUNTIME: std::sync::OnceLock<Runtime> = std::sync::OnceLock::new();
200
201/// Telemetry: Tracks fibers that failed the 8KB zero-copy check and fell back to heap allocation.
202///
203/// A high value indicates that captured future sizes exceed the pre-allocated
204/// stack-top buffer, causing a performance cliff due to heap traffic.
205#[doc(hidden)]
206pub static HEAP_ESCAPED_SPAWNS: core::sync::atomic::AtomicU64 =
207 core::sync::atomic::AtomicU64::new(0);
208
209/// Awakens a fiber by pushing it onto the scheduler mesh.
210///
211/// Dispatches between `enqueue_pinned` and `enqueue_deflect` based on the
212/// fiber's stored `mode`. Pinned fibers (`SameThread` switchers) skip the
213/// deflection hash and route strictly to their `origin_core`; deflectable
214/// fibers (`CrossThread` switchers) consult load and may hop / spill to the
215/// warehouse. The mode is set at spawn time and never changes.
216///
217/// # Arguments
218/// * `origin_core` - The core ID where the fiber was originally spawned.
219/// * `fiber_index` - The unique identifier of the fiber in the context pool.
220#[inline(always)]
221pub(crate) fn wake_fiber(origin_core: usize, fiber_index: u32) {
222 let runtime = GLOBAL_RUNTIME
223 .get()
224 .expect("dtact::wake_fiber() invoked before Runtime Initialization");
225 let pool = &runtime.pool;
226 let ctx_ptr = pool.get_context_ptr(fiber_index);
227 let pinned = matches!(
228 unsafe { (*ctx_ptr).mode },
229 common_types::TopologyMode::Pinned
230 ) || matches!(
231 unsafe { (*ctx_ptr).affinity },
232 crate::api::topology::Affinity::SameCore
233 );
234 let affinity = unsafe { (*ctx_ptr).affinity };
235
236 loop {
237 // Two-entry function-pointer table — branchless after the bool is computed.
238 type EnqFn = fn(
239 &dta_scheduler::DtaScheduler,
240 usize,
241 u64,
242 u32,
243 crate::api::topology::Affinity,
244 ) -> bool;
245 const ENQUEUE_FNS: [EnqFn; 2] = [enqueue_deflect_shim, enqueue_pinned_shim];
246 let success = ENQUEUE_FNS[usize::from(pinned)](
247 &runtime.scheduler,
248 origin_core,
249 u64::from(fiber_index),
250 fiber_index,
251 affinity,
252 );
253 if success {
254 return;
255 }
256
257 // Backpressure: enqueue_pinned can fail when the target's local queue
258 // is over the watermark AND the cross-core mailbox is full.
259 // (enqueue_deflect never returns false — it either places in a mailbox
260 // or panics via warehouse overflow.) Yield to give the scheduler a
261 // chance to drain, then retry.
262 backpressure_yield();
263 }
264}
265
266#[inline(always)]
267fn enqueue_pinned_shim(
268 sched: &dta_scheduler::DtaScheduler,
269 target: usize,
270 _flow: u64,
271 task: u32,
272 _affinity: crate::api::topology::Affinity,
273) -> bool {
274 sched.enqueue_pinned(target, task)
275}
276
277#[inline(always)]
278fn enqueue_deflect_shim(
279 sched: &dta_scheduler::DtaScheduler,
280 source: usize,
281 flow: u64,
282 task: u32,
283 affinity: crate::api::topology::Affinity,
284) -> bool {
285 sched.enqueue_deflect(source, flow, task, affinity)
286}
287
288/// State-guarded fiber wake by pool index.
289///
290/// Atomically swaps the target's `state` to `Notified`. Only enqueues
291/// the fiber via [`wake_fiber`] when the prior state was `Yielded`
292/// (the fiber is parked off-CPU and needs a worker to re-dispatch it).
293///
294/// For `Running` / `Suspending` the fiber is currently held by a worker;
295/// that worker's `dispatch_loop` observes `Notified` after `switch_fn`
296/// returns and re-pushes via `push_local`. Skipping the redundant
297/// external enqueue is what prevents a double-dispatch race on
298/// deflectable (`CrossThread`) fibers — without the guard, the same
299/// fiber index could land in two workers' queues, both call `switch_fn`
300/// into the same stack concurrently, clobber `executor_regs`, and leave
301/// one worker permanently stranded inside the fiber.
302///
303/// Mirrors the protocol applied inline by
304/// [`future_bridge::wake_by_ref_impl`](crate::future_bridge); the only
305/// difference is that this helper resolves the context via the pool
306/// because callers only hold the index, not a `&FiberContext`.
307///
308/// MUST NOT be used by spawn paths, which intentionally publish a new
309/// fiber while it is still in `Running` and rely on the unconditional
310/// `wake_fiber` enqueue for first dispatch.
311#[inline(always)]
312pub(crate) fn awaken_fiber_by_index(target_worker: usize, fiber_index: u32) {
313 let runtime = GLOBAL_RUNTIME
314 .get()
315 .expect("dtact::awaken_fiber_by_index() invoked before Runtime Initialization");
316 let ctx_ptr = runtime.pool.get_context_ptr(fiber_index);
317 let prev = unsafe {
318 (*ctx_ptr).state.swap(
319 crate::memory_management::FiberStatus::Notified as u32,
320 core::sync::atomic::Ordering::AcqRel,
321 )
322 };
323 if prev == crate::memory_management::FiberStatus::Yielded as u32 {
324 wake_fiber(target_worker, fiber_index);
325 }
326}
327
328/// Resolves an opaque waiter handle (encoded by `dtact_await`'s fiber path)
329/// and conditionally re-enqueues the waiting fiber via the state-guarded
330/// wake protocol — see [`awaken_fiber_by_index`] for the protocol details
331/// and the double-dispatch race it prevents.
332#[inline(always)]
333pub(crate) fn wake_waiter_handle(packed: u64) {
334 let waiter = packed & !(1u64 << 63);
335 let fiber_index = (waiter & 0xFFFF_FFFF) as u32;
336 // The stored `target_worker` is the worker the waiter was running on
337 // when it suspended — used as the routing source for `enqueue_deflect`
338 // if we actually need to enqueue.
339 let target_worker = (waiter >> 32) as usize;
340 awaken_fiber_by_index(target_worker, fiber_index);
341}
342
343/// Backpressure handler: cooperative yield if inside a fiber, brief
344/// spin + OS yield if on a host thread.
345#[inline]
346fn backpressure_yield() {
347 let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
348 if ctx_ptr.is_null() {
349 for _ in 0..32 {
350 core::hint::spin_loop();
351 }
352 std::thread::yield_now();
353 } else {
354 unsafe {
355 let ctx = &mut *ctx_ptr;
356 ctx.state.store(
357 crate::memory_management::FiberStatus::Notified as u32,
358 core::sync::atomic::Ordering::Release,
359 );
360 (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
361 }
362 }
363}
364
365#[allow(clippy::mixed_attributes_style)]
366#[cfg_attr(miri, ignore)]
367#[doc(hidden)]
368mod readme {
369 #![doc = include_str!("../README.md")]
370}