Skip to main content

dtact/
api.rs

1pub use crate::c_ffi::dtact_handle_t;
2pub use crate::common_types::{TopologyMode, WorkloadKind};
3pub use crate::memory_management::{ContextPool, FiberContext, FiberStatus, SafetyLevel};
4use core::future::Future;
5use core::pin::Pin;
6pub use topology::Affinity;
7
8/// Scheduling Priority for fibers.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum Priority {
11    /// Background tasks with no latency requirements.
12    Low,
13    /// Standard application tasks.
14    Normal,
15    /// Latency-sensitive tasks that should preempt normal work.
16    High,
17    /// Critical real-time tasks that must run as soon as possible.
18    Critical,
19}
20
21/// Interface for custom context switching logic.
22pub trait ContextSwitcher: Send + Sync + 'static {
23    /// The raw assembly function used for switching to/from this fiber.
24    const SWITCH_FN: unsafe extern "C" fn(
25        *mut crate::memory_management::Registers,
26        *const crate::memory_management::Registers,
27    );
28}
29
30/// Standard switcher that saves/restores floating-point state and supports cross-thread migration.
31pub struct CrossThreadFloat;
32impl ContextSwitcher for CrossThreadFloat {
33    const SWITCH_FN: unsafe extern "C" fn(
34        *mut crate::memory_management::Registers,
35        *const crate::memory_management::Registers,
36    ) = crate::context_switch::switch_context_cross_thread_float;
37}
38
39/// Lightweight switcher that skips floating-point state but supports cross-thread migration.
40pub struct CrossThreadNoFloat;
41impl ContextSwitcher for CrossThreadNoFloat {
42    const SWITCH_FN: unsafe extern "C" fn(
43        *mut crate::memory_management::Registers,
44        *const crate::memory_management::Registers,
45    ) = crate::context_switch::switch_context_cross_thread_no_float;
46}
47
48/// Optimized switcher for fibers pinned to a single thread, saving/restoring floating-point state.
49pub struct SameThreadFloat;
50impl ContextSwitcher for SameThreadFloat {
51    const SWITCH_FN: unsafe extern "C" fn(
52        *mut crate::memory_management::Registers,
53        *const crate::memory_management::Registers,
54    ) = crate::context_switch::switch_context_same_thread_float;
55}
56
57/// The fastest possible switcher: pins to one thread and ignores floating-point state.
58pub struct SameThreadNoFloat;
59impl ContextSwitcher for SameThreadNoFloat {
60    const SWITCH_FN: unsafe extern "C" fn(
61        *mut crate::memory_management::Registers,
62        *const crate::memory_management::Registers,
63    ) = crate::context_switch::switch_context_same_thread_no_float;
64}
65
66/// Fluent builder for configuring and launching fibers.
67pub struct SpawnBuilder<S: ContextSwitcher = CrossThreadFloat> {
68    name: Option<&'static str>,
69    affinity: topology::Affinity,
70    priority: Priority,
71    kind: WorkloadKind,
72    mode: TopologyMode,
73    safety: crate::memory_management::SafetyLevel,
74    _marker: core::marker::PhantomData<S>,
75}
76
77impl<S: ContextSwitcher> Default for SpawnBuilder<S> {
78    #[inline(always)]
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl<S: ContextSwitcher> SpawnBuilder<S> {
85    /// Creates a new builder with default settings:
86    /// Normal priority, Compute kind, P2P Mesh mode, and Safety0 (raw performance).
87    #[inline(always)]
88    #[must_use]
89    pub const fn new() -> Self {
90        Self {
91            name: None,
92            affinity: topology::Affinity::SameCore,
93            priority: Priority::Normal,
94            kind: WorkloadKind::Compute,
95            mode: TopologyMode::P2PMesh,
96            safety: crate::memory_management::SafetyLevel::Safety0,
97            _marker: core::marker::PhantomData,
98        }
99    }
100
101    /// Sets the workload kind (Compute or IO).
102    #[inline(always)]
103    #[must_use]
104    pub const fn kind(mut self, kind: WorkloadKind) -> Self {
105        self.kind = kind;
106        self
107    }
108
109    /// Sets the topology mode (P2P Mesh or Local Queue).
110    #[inline(always)]
111    #[must_use]
112    pub const fn topology_mode(mut self, mode: TopologyMode) -> Self {
113        self.mode = mode;
114        self
115    }
116
117    /// Sets the hardware safety level (0-2).
118    #[inline(always)]
119    #[must_use]
120    pub const fn safety(mut self, safety: crate::memory_management::SafetyLevel) -> Self {
121        self.safety = safety;
122        self
123    }
124
125    /// Sets a descriptive name for the fiber (useful for telemetry).
126    #[inline(always)]
127    #[must_use]
128    pub const fn name(mut self, name: &'static str) -> Self {
129        self.name = Some(name);
130        self
131    }
132
133    /// Sets the core affinity (`SameCore`, `SameNUMA`, etc.).
134    #[inline(always)]
135    #[must_use]
136    pub const fn affinity(mut self, affinity: topology::Affinity) -> Self {
137        self.affinity = affinity;
138        self
139    }
140
141    /// Sets the scheduling priority.
142    #[inline(always)]
143    #[must_use]
144    pub const fn priority(mut self, priority: Priority) -> Self {
145        self.priority = priority;
146        self
147    }
148
149    /// Switches the context-switching strategy (e.g. `SameThreadNoFloat`).
150    #[inline(always)]
151    #[must_use]
152    pub const fn switcher<NewS: ContextSwitcher>(self) -> SpawnBuilder<NewS> {
153        SpawnBuilder {
154            name: self.name,
155            affinity: self.affinity,
156            priority: self.priority,
157            kind: self.kind,
158            mode: self.mode,
159            safety: self.safety,
160            _marker: core::marker::PhantomData,
161        }
162    }
163
164    /// Finalizes and launches the fiber into the runtime.
165    ///
166    /// This performs the critical "Zero-Copy" layout calculation:
167    /// 1. Attempts to place the Future directly at the top of the fiber stack.
168    /// 2. If the Future is too large (>8KB), falls back to heap allocation.
169    /// 3. Configures the assembly trampoline for the selected `ContextSwitcher`.
170    ///
171    /// # Panics
172    /// * Panics if the runtime is not initialized.
173    /// * Panics if the context pool is exhausted.
174    #[inline(always)]
175    #[allow(clippy::cast_possible_truncation)]
176    #[allow(clippy::useless_let_if_seq)]
177    #[allow(clippy::too_many_lines)]
178    pub fn spawn<F: Future + Send + 'static>(self, fut: F) -> dtact_handle_t {
179        let runtime = crate::GLOBAL_RUNTIME
180            .get()
181            .expect("Dtact Runtime not initialized");
182        let pool = &runtime.pool;
183        let mut fixed_spins: u32 = 0;
184
185        let ctx_id = 'alloc: loop {
186            if let Some(id) = pool.alloc_context() {
187                // If we are in a fiber, reward the success
188                let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
189                if !ctx_ptr.is_null() {
190                    unsafe {
191                        let ctx = &mut *ctx_ptr;
192                        ctx.adaptive_spin_count = (ctx.adaptive_spin_count + 1).min(2000);
193                        ctx.spin_failure_count = ctx.spin_failure_count.saturating_sub(1);
194                    }
195                }
196                break 'alloc id;
197            }
198
199            let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
200            if ctx_ptr.is_null() {
201                // HOST-THREAD SPINNING
202                if fixed_spins < 2000 {
203                    core::hint::spin_loop();
204                    fixed_spins += 1;
205
206                    // Sparse Polling for host threads too
207                    if fixed_spins.trailing_zeros() >= 3
208                        && let Some(id) = pool.alloc_context()
209                    {
210                        break 'alloc id;
211                    }
212                } else {
213                    std::thread::yield_now();
214                    fixed_spins = 0; // Reset after yield
215                }
216            } else {
217                // FIBER-AWARE ADAPTIVE SPINNING
218                unsafe {
219                    let ctx = &mut *ctx_ptr;
220                    let current_spin = ctx.adaptive_spin_count;
221                    let failure_count = ctx.spin_failure_count;
222
223                    // Only spin if failure count is low
224                    if failure_count < 20 {
225                        for i in 0..current_spin {
226                            core::hint::spin_loop();
227
228                            // Sparse Polling: only check the pool every 8 iterations to reduce L1 pressure
229                            if i.trailing_zeros() >= 3
230                                && let Some(id) = pool.alloc_context()
231                            {
232                                ctx.adaptive_spin_count = (current_spin + 2).min(2000);
233                                ctx.spin_failure_count = failure_count.saturating_sub(1);
234                                break 'alloc id;
235                            }
236                        }
237                    }
238
239                    // Spin failed: Penalize budget and yield
240                    ctx.spin_failure_count = failure_count.saturating_add(1);
241                    ctx.adaptive_spin_count = current_spin.saturating_sub(100).max(200);
242
243                    ctx.state.store(
244                        crate::memory_management::FiberStatus::Notified as u8,
245                        core::sync::atomic::Ordering::Release,
246                    );
247                    (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
248                }
249            }
250        };
251
252        let ctx_ptr = pool.get_context_ptr(ctx_id);
253        let current_core = crate::future_bridge::CURRENT_WORKER_ID.with(|c| {
254            let id = c.get();
255            if id < runtime.scheduler.workers.len() {
256                id
257            } else {
258                topology::current().core_id as usize % runtime.scheduler.workers.len()
259            }
260        });
261
262        unsafe {
263            (*ctx_ptr).state.store(
264                crate::memory_management::FiberStatus::Running as u8,
265                core::sync::atomic::Ordering::Release,
266            );
267            (*ctx_ptr).kind = self.kind;
268            (*ctx_ptr).mode = self.mode;
269            (*ctx_ptr).origin_core = current_core as u16;
270            (*ctx_ptr).fiber_index = ctx_id;
271            (*ctx_ptr).switch_fn = S::SWITCH_FN;
272
273            // Set adaptive spin count based on workload kind
274            (*ctx_ptr).adaptive_spin_count = match self.kind {
275                WorkloadKind::Compute => 1000,
276                WorkloadKind::IO => 100,
277                WorkloadKind::Memory => 500,
278                WorkloadKind::System => 200,
279            };
280
281            // Aligned Zero-Copy Future Migration
282            let align = core::mem::align_of::<F>();
283            let fut_size = core::mem::size_of::<F>();
284            let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
285            let buffer_end = buffer_start + 8192;
286            let aligned_fut_addr = (buffer_end - fut_size) & !(align - 1);
287
288            // Determine where the stack region ends (just below the future).
289            // The stack grows DOWNWARD from this address toward buffer_start.
290            let stack_limit: usize;
291
292            if aligned_fut_addr < buffer_start || (aligned_fut_addr + fut_size) > buffer_end {
293                // Future exceeds pre-allocated 8KB buffer. Fallback to heap.
294                crate::HEAP_ESCAPED_SPAWNS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
295
296                #[cfg(debug_assertions)]
297                {
298                    static WARNED: core::sync::atomic::AtomicBool =
299                        core::sync::atomic::AtomicBool::new(false);
300                    if !WARNED.swap(true, core::sync::atomic::Ordering::Relaxed) {
301                        eprintln!(
302                            "DTA-V3 WARNING: Future exceeds or misaligns 8KB zero-copy buffer. Switching to heap-allocation mode."
303                        );
304                    }
305                }
306
307                let boxed = Box::new(fut);
308                let fut_ptr = Box::into_raw(boxed);
309                (*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
310                (*ctx_ptr).invoke_closure = |ptr| unsafe {
311                    let mut f = Box::from_raw(ptr.cast::<F>());
312                    let f_pinned = Pin::new_unchecked(&mut *f);
313                    crate::future_bridge::wait_pinned(f_pinned);
314                };
315                (*ctx_ptr).cleanup_fn = None;
316
317                // Heap path: entire 8KB buffer is available as stack
318                stack_limit = buffer_end;
319            } else {
320                let fut_ptr = aligned_fut_addr as *mut F;
321                core::ptr::write(fut_ptr, fut);
322
323                (*ctx_ptr).invoke_closure = |ptr| {
324                    let f_ptr = ptr.cast::<F>();
325                    unsafe {
326                        let f_pinned = Pin::new_unchecked(&mut *f_ptr);
327                        crate::future_bridge::wait_pinned(f_pinned);
328                        core::ptr::drop_in_place(f_ptr);
329                    }
330                };
331                (*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
332
333                // Inline path: stack lives below the future
334                stack_limit = aligned_fut_addr;
335            }
336
337            // ABI-compliant stack alignment
338            #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
339            let stack_top = (stack_limit & !0xF) - 8;
340            #[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
341            let stack_top = stack_limit & !0xF;
342
343            #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
344            let stack_top_ptr = stack_top as *mut u64;
345
346            // Poison return address (dtact_abort) — if fiber_entry_point ever returns,
347            // this triggers a controlled abort instead of undefined behavior.
348            #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
349            core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
350
351            let stack_top = stack_top as *mut u8;
352
353            #[cfg(target_arch = "x86_64")]
354            {
355                (*ctx_ptr).regs.gprs[0] = stack_top as u64; // RSP
356                (*ctx_ptr).regs.gprs[7] = fiber_entry_point as *const () as u64; // RIP
357                #[cfg(windows)]
358                {
359                    (*ctx_ptr).regs.gprs[10] = stack_limit as u64; // Stack Base
360                    (*ctx_ptr).regs.gprs[11] = buffer_start as u64; // Stack Limit
361                    (*ctx_ptr).regs.gprs[12] = buffer_start as u64; // DeallocationStack
362                    (*ctx_ptr).regs.gprs[13] = !0; // ExceptionList
363                }
364            }
365            #[cfg(target_arch = "aarch64")]
366            {
367                (*ctx_ptr).regs.gprs[12] = stack_top as u64; // SP
368                (*ctx_ptr).regs.gprs[11] = fiber_entry_point as *const () as u64; // x30 (LR)
369                #[cfg(windows)]
370                {
371                    (*ctx_ptr).regs.gprs[13] = stack_limit as u64; // Stack Base
372                    (*ctx_ptr).regs.gprs[14] = buffer_start as u64; // Stack Limit
373                    (*ctx_ptr).regs.gprs[15] = buffer_start as u64; // DeallocationStack
374                }
375            }
376            #[cfg(target_arch = "riscv64")]
377            {
378                (*ctx_ptr).regs.gprs[0] = stack_top as u64; // SP
379                (*ctx_ptr).regs.gprs[13] = fiber_entry_point as *const () as u64; // RA
380            }
381        }
382
383        let r#gen = u64::from(unsafe {
384            (*ctx_ptr)
385                .generation
386                .load(core::sync::atomic::Ordering::Acquire)
387        });
388
389        crate::wake_fiber(current_core, ctx_id);
390
391        // Handle Layout: [1-bit Valid | 15-bit Generation | 16-bit CoreID | 32-bit ContextID]
392        dtact_handle_t(
393            u64::from(ctx_id)
394                | ((current_core as u64) << 32)
395                | ((r#gen & 0xFFFF) << 48)
396                | (1 << 63),
397        )
398    }
399}
400
401pub(crate) unsafe extern "C" fn fiber_entry_point() {
402    let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
403    if ctx_ptr.is_null() {
404        return;
405    }
406
407    let ctx = unsafe { &mut *ctx_ptr };
408    let invoke = ctx.invoke_closure;
409    let arg = ctx.closure_ptr;
410
411    // Execute the task payload with SEH/Panic protection
412    let _ = std::panic::catch_unwind(core::panic::AssertUnwindSafe(move || {
413        unsafe { invoke(arg) };
414    }));
415
416    // Execute cleanup if present (e.g. FFI arg free) — MUST happen before we lose the context
417    if let Some(cleanup) = ctx.cleanup_fn.take() {
418        unsafe { cleanup(ctx.closure_ptr) };
419    }
420
421    // Mark as Finished. The scheduler will return this context to the pool
422    // AFTER we switch back, preventing use-after-free races.
423    ctx.state.store(
424        crate::memory_management::FiberStatus::Finished as u8,
425        core::sync::atomic::Ordering::Release,
426    );
427
428    // Wake up any fiber waiting for this one (FFI join).
429    // MUST happen BEFORE free_context, otherwise the context could be reallocated
430    // and the waiter_handle overwritten before we read
431    let waiter = ctx
432        .waiter_handle
433        .swap(0, core::sync::atomic::Ordering::AcqRel);
434    if waiter != 0 {
435        let waiter = waiter & !(1 << 63); // Strip sentinel bit
436        let waiter_ctx_id = (waiter & 0xFFFF_FFFF) as u32;
437        let target_worker = (waiter >> 32) as usize;
438
439        if let Some(runtime) = crate::GLOBAL_RUNTIME.get() {
440            let num_workers = runtime.scheduler.workers.len();
441            let target_worker = target_worker % num_workers;
442            let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
443
444            if current_worker == target_worker {
445                // We are on the target worker's thread! Safe to push local.
446                unsafe {
447                    let worker = &mut *runtime.scheduler.workers[target_worker].get();
448                    worker.push_local(waiter_ctx_id);
449                }
450            } else if current_worker < num_workers {
451                // Cross-core wakeup: use the mailbox matrix.
452                let mut chunk = crate::dta_scheduler::TaskChunk::default();
453                chunk.tasks[0] = waiter_ctx_id;
454                chunk.count = 1;
455                let _ = runtime.scheduler.mailboxes[current_worker][target_worker].push(chunk);
456            } else {
457                // Fallback for non-worker threads: use global enqueue or pick a source
458                let _ = runtime.scheduler.enqueue_task(
459                    target_worker,
460                    u64::from(waiter_ctx_id),
461                    waiter_ctx_id,
462                );
463            }
464        }
465    }
466
467    // Also wake any non-fiber thread blocked on futex_wait
468    unsafe { crate::utils::futex_wake(&raw const ctx.state) };
469
470    // Switch back to the scheduler. The scheduler's dispatch_loop will see
471    // state == Finished and call free_context on our behalf.
472    unsafe {
473        (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
474    }
475}
476
477/// Global epoch counter for hardware topology changes.
478/// Incremented whenever a thread migration across CCX/NUMA boundaries is detected.
479pub static TOPOLOGY_EPOCH: core::sync::atomic::AtomicU64 = core::sync::atomic::AtomicU64::new(0);
480
481/// Hardware Topology Discovery and Affinity Management.
482pub mod topology {
483    /// Resumption affinity hints for the P2P Mesh scheduler.
484    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
485    pub enum Affinity {
486        /// Resume on the same physical CPU core.
487        SameCore,
488        /// Resume on any core within the same Core Complex (CCX).
489        SameCCX,
490        /// Resume on any core within the same NUMA node.
491        SameNUMA,
492        /// No affinity preference.
493        Any,
494    }
495
496    /// Returns the Core ID of the currently executing hardware thread.
497    #[inline(always)]
498    #[must_use]
499    pub fn current_core() -> u16 {
500        current().core_id
501    }
502
503    /// Hierarchical representation of a CPU core's location.
504    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
505    pub struct CpuLevel {
506        /// Logical Core ID.
507        pub core_id: u16,
508        /// Core Complex (L3 boundary) ID.
509        pub ccx_id: u16,
510        /// Non-Uniform Memory Access (NUMA) node ID.
511        pub numa_id: u16,
512    }
513
514    /// Returns the hierarchical topology information for the current core.
515    ///
516    /// This function utilizes thread-local caching and adaptive refresh
517    /// intervals to minimize the overhead of hardware discovery (e.g., CPUID).
518    #[inline(always)]
519    pub fn current() -> CpuLevel {
520        thread_local! {
521            static CACHED: core::cell::Cell<(CpuLevel, u64)> = const {
522                core::cell::Cell::new((CpuLevel { core_id: 0, ccx_id: 0, numa_id: 0 }, 0))
523            };
524        }
525
526        let (mut cpu, mut last_refresh) = CACHED.with(std::cell::Cell::get);
527        let (now, cpu_id) = crate::utils::get_tick_with_cpu();
528
529        // Refresh every 100k cycles OR if Core ID mismatch (vCPU migration)
530        if now.wrapping_sub(last_refresh) > 100_000 || u32::from(cpu.core_id) != cpu_id {
531            let next_cpu = current_raw();
532            if next_cpu != cpu {
533                crate::TOPOLOGY_EPOCH.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
534                cpu = next_cpu;
535            }
536            last_refresh = now;
537            CACHED.with(|c| c.set((cpu, last_refresh)));
538        }
539        cpu
540    }
541
542    /// Performs a raw hardware topology discovery via CPUID/MPIDR.
543    #[inline(always)]
544    #[must_use]
545    pub fn current_raw() -> CpuLevel {
546        #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
547        {
548            let (x2apic_id, core_shift, package_shift): (u32, u32, u32);
549
550            unsafe {
551                let (mut eax, mut edx_v): (u32, u32);
552                core::arch::asm!(
553                    "push rbx",
554                    "cpuid",
555                    "mov {ebx_out:e}, ebx",
556                    "pop rbx",
557                    ebx_out = out(reg) _,
558                    inout("eax") 0x0B => eax,
559                    inout("ecx") 0 => _,
560                    out("edx") edx_v,
561                );
562                core_shift = eax;
563                x2apic_id = edx_v;
564
565                let eax_p: u32;
566                core::arch::asm!(
567                    "push rbx",
568                    "cpuid",
569                    "mov {ebx_out:e}, ebx",
570                    "pop rbx",
571                    ebx_out = out(reg) _,
572                    inout("eax") 0x0B => eax_p,
573                    inout("ecx") 1 => _,
574                    out("edx") _,
575                );
576                package_shift = eax_p;
577            }
578
579            let core_id = x2apic_id & ((1 << core_shift) - 1);
580            let ccx_id = (x2apic_id >> core_shift) & ((1 << (package_shift - core_shift)) - 1);
581            let numa_id = x2apic_id >> package_shift;
582
583            CpuLevel {
584                core_id: (core_id & 0xFFFF) as u16,
585                ccx_id: (ccx_id & 0xFFFF) as u16,
586                numa_id: (numa_id & 0xFFFF) as u16,
587            }
588        }
589
590        #[cfg(target_arch = "aarch64")]
591        {
592            let mut mpidr: u64;
593            unsafe {
594                core::arch::asm!("mrs {}, mpidr_el1", out(reg) mpidr, options(nomem, nostack, preserves_flags));
595            }
596            return CpuLevel {
597                core_id: (mpidr & 0xFF) as u16,
598                ccx_id: ((mpidr >> 8) & 0xFF) as u16,
599                numa_id: ((mpidr >> 16) & 0xFF) as u16,
600            };
601        }
602
603        #[cfg(target_arch = "riscv64")]
604        {
605            let mut hart_id: u64;
606            unsafe {
607                core::arch::asm!("csrr {}, mhartid", out(reg) hart_id, options(nomem, nostack, preserves_flags));
608            }
609            return CpuLevel {
610                core_id: (hart_id & 0xFFFF) as u16,
611                ccx_id: (hart_id >> 16) as u16,
612                numa_id: 0,
613            };
614        }
615
616        #[cfg(not(any(
617            target_arch = "x86",
618            target_arch = "x86_64",
619            target_arch = "aarch64",
620            target_arch = "riscv64"
621        )))]
622        {
623            CpuLevel {
624                core_id: 0,
625                ccx_id: 0,
626                numa_id: 0,
627            }
628        }
629    }
630}
631
632/// Spawns a new fiber and returns a handle for synchronization.
633#[inline(always)]
634pub fn spawn<F: Future + Send + 'static>(fut: F) -> dtact_handle_t {
635    SpawnBuilder::<CrossThreadFloat>::new().spawn(fut)
636}
637
638/// Returns a new `SpawnBuilder` for configuring a fiber.
639#[inline(always)]
640#[must_use]
641pub const fn spawn_with() -> SpawnBuilder<CrossThreadFloat> {
642    SpawnBuilder::new()
643}
644
645/// Fiber configuration and construction utilities.
646#[doc(hidden)]
647pub mod spawn {
648    use super::{CrossThreadFloat, SpawnBuilder};
649    /// Returns a new `SpawnBuilder` with default settings.
650    #[inline(always)]
651    #[must_use]
652    #[doc(hidden)]
653    pub const fn builder() -> SpawnBuilder<CrossThreadFloat> {
654        SpawnBuilder::new()
655    }
656}
657
658/// Fiber-local execution and synchronization utilities.
659pub mod fiber {
660    use super::{dtact_handle_t, topology};
661    /// Spawns a fiber from a closure with a specific stack configuration.
662    ///
663    /// # Panics
664    /// * Panics if the runtime is not initialized.
665    /// * Panics if the context pool is exhausted.
666    #[inline]
667    #[allow(clippy::cast_possible_truncation)]
668    pub fn spawn_with_stack<F: FnOnce() + Send + 'static>(
669        _stack_size_str: &str,
670        f: F,
671    ) -> dtact_handle_t {
672        let runtime = crate::GLOBAL_RUNTIME
673            .get()
674            .expect("Dtact Runtime not initialized");
675        let pool = &runtime.pool;
676        let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
677        let ctx_ptr = pool.get_context_ptr(ctx_id);
678        #[allow(clippy::cast_possible_truncation)]
679        let current_core = topology::current().core_id as usize;
680
681        unsafe {
682            (*ctx_ptr).state.store(
683                crate::memory_management::FiberStatus::Running as u8,
684                core::sync::atomic::Ordering::Release,
685            );
686            (*ctx_ptr).origin_core = current_core as u16;
687            (*ctx_ptr).fiber_index = ctx_id;
688            (*ctx_ptr).switch_fn = crate::context_switch::switch_context_same_thread_no_float;
689
690            let f_ptr = (*ctx_ptr).read_buffer_ptr.cast::<F>();
691            core::ptr::write(f_ptr, f);
692            (*ctx_ptr).invoke_closure = |ptr| {
693                let f = core::ptr::read(ptr.cast::<F>());
694                f();
695            };
696            (*ctx_ptr).closure_ptr = f_ptr.cast::<()>();
697
698            // Point 1: Shadow Space Separation (Stack MUST start BELOW the 8KB Future buffer)
699            let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
700            let stack_top = (buffer_start & !0xF) - 72;
701            let stack_top_ptr = stack_top as *mut u64;
702
703            // Point 4: "Return-to-Nowhere" Protection
704            core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
705
706            let stack_top = stack_top as *mut u8;
707
708            #[cfg(target_arch = "x86_64")]
709            {
710                (*ctx_ptr).regs.gprs[0] = stack_top as u64; // RSP
711                (*ctx_ptr).regs.gprs[7] = super::fiber_entry_point as *const () as u64; // RIP
712                #[cfg(windows)]
713                {
714                    let limit = buffer_start.saturating_sub(pool.slot_size);
715                    (*ctx_ptr).regs.gprs[10] = buffer_start as u64; // Stack Base
716                    (*ctx_ptr).regs.gprs[11] = limit as u64; // Stack Limit
717                    (*ctx_ptr).regs.gprs[12] = limit as u64; // DeallocationStack
718                    (*ctx_ptr).regs.gprs[13] = !0; // ExceptionList
719                }
720            }
721            #[cfg(target_arch = "aarch64")]
722            {
723                (*ctx_ptr).regs.gprs[12] = stack_top as u64; // SP
724                (*ctx_ptr).regs.gprs[11] = super::fiber_entry_point as *const () as u64; // x30 (LR)
725                #[cfg(windows)]
726                {
727                    let limit = buffer_start.saturating_sub(pool.slot_size);
728                    (*ctx_ptr).regs.gprs[13] = buffer_start as u64; // Stack Base
729                    (*ctx_ptr).regs.gprs[14] = limit as u64; // Stack Limit
730                    (*ctx_ptr).regs.gprs[15] = limit as u64; // DeallocationStack
731                }
732            }
733            #[cfg(target_arch = "riscv64")]
734            {
735                (*ctx_ptr).regs.gprs[0] = stack_top as u64; // SP
736                (*ctx_ptr).regs.gprs[13] = super::fiber_entry_point as *const () as u64; // RA
737            }
738        }
739
740        crate::wake_fiber(current_core, ctx_id);
741        dtact_handle_t(u64::from(ctx_id) | ((current_core as u64) << 32))
742    }
743
744    /// Yields execution directly to another fiber.
745    /// Note: This is a hint to the scheduler.
746    #[inline(always)]
747    pub fn yield_to(handle: dtact_handle_t) {
748        let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
749        if ctx_ptr.is_null() {
750            return;
751        }
752
753        let target_ctx_id = (handle.0 & 0xFFFF_FFFF) as u32;
754        let target_core_id = ((handle.0 >> 32) & 0xFFFF) as usize;
755
756        crate::wake_fiber(target_core_id, target_ctx_id);
757
758        unsafe {
759            let ctx = &mut *ctx_ptr;
760            ctx.state.store(
761                crate::memory_management::FiberStatus::Suspending as u8,
762                core::sync::atomic::Ordering::Release,
763            );
764            (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
765        }
766    }
767}
768
769/// Advanced Hardware Acceleration primitives.
770#[cfg(feature = "hw-acceleration")]
771pub mod hw {
772    /// Hardware-Assisted Optimization: Proactively push data to L3 cache
773    #[inline(always)]
774    pub fn cldemote<T>(ptr: *const T) {
775        #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
776        unsafe {
777            core::arch::asm!("cldemote [{}]", in(reg) ptr);
778        }
779        #[cfg(target_arch = "aarch64")]
780        unsafe {
781            core::arch::asm!("dc cvac, {}", in(reg) ptr);
782        }
783        #[cfg(target_arch = "riscv64")]
784        unsafe {
785            core::arch::asm!("cbo.clean 0({0})", in(reg) ptr);
786        }
787    }
788
789    /// User-mode interrupt wakeup signal
790    #[inline(always)]
791    pub fn uintr_signal(target_cpu: usize) {
792        #[cfg(target_arch = "x86_64")]
793        unsafe {
794            core::arch::asm!(
795                "mov rax, {}",
796                ".byte 0xf3, 0x0f, 0xc7, 0xf0",
797                in(reg) target_cpu as u64,
798                out("rax") _,
799                options(nostack, preserves_flags),
800            );
801        }
802        #[cfg(target_arch = "aarch64")]
803        unsafe {
804            core::arch::asm!("sev", options(nostack, preserves_flags));
805        }
806        #[cfg(target_arch = "riscv64")]
807        unsafe {
808            core::arch::asm!("csrw uipi, {0}", in(reg) target_cpu);
809        }
810    }
811}
812
813/// Yields execution to the scheduler.
814#[inline(always)]
815pub async fn yield_now() {
816    struct YieldNow(bool);
817    impl Future for YieldNow {
818        type Output = ();
819        #[inline(always)]
820        fn poll(
821            mut self: core::pin::Pin<&mut Self>,
822            cx: &mut core::task::Context<'_>,
823        ) -> core::task::Poll<Self::Output> {
824            if self.0 {
825                core::task::Poll::Ready(())
826            } else {
827                self.0 = true;
828                cx.waker().wake_by_ref();
829                core::task::Poll::Pending
830            }
831        }
832    }
833    YieldNow(false).await;
834}
835
836/// Yields execution to another fiber handle asynchronously.
837#[inline(always)]
838pub async fn yield_to(handle: dtact_handle_t) {
839    let handle_val = handle.0 & !(1 << 63); // Strip sentinel bit
840    let target_ctx_id = (handle_val & 0xFFFF_FFFF) as u32;
841    let target_core_id = ((handle_val >> 32) & 0xFFFF) as usize;
842    crate::wake_fiber(target_core_id, target_ctx_id);
843    yield_now().await;
844}
845
846/// Global Runtime Configuration and Telemetry.
847pub mod config {
848    use core::sync::atomic::Ordering;
849    /// Sets the work-deflection threshold for a specific hardware worker.
850    #[inline(always)]
851    pub fn set_deflection_threshold(core_id: usize, threshold: u8) {
852        if let Some(runtime) = crate::GLOBAL_RUNTIME.get()
853            && core_id < runtime.scheduler.workers.len()
854        {
855            unsafe {
856                let worker = &*runtime.scheduler.workers[core_id].get();
857                worker
858                    .deflection_threshold
859                    .store(threshold, Ordering::Release);
860            }
861        }
862    }
863}
864
865/// Extension trait for blocking on asynchronous futures from within a fiber.
866pub trait DtactWaitExt {
867    /// The type of value produced by the future.
868    type Output;
869    /// Blocks the current fiber until the future resolves.
870    fn wait(self) -> Self::Output;
871}
872
873impl<F: Future> DtactWaitExt for F {
874    type Output = F::Output;
875    #[inline(always)]
876    fn wait(self) -> Self::Output {
877        crate::future_bridge::wait(self)
878    }
879}