1pub use crate::c_ffi::dtact_handle_t;
2pub use crate::common_types::{TopologyMode, WorkloadKind};
3pub use crate::memory_management::{ContextPool, FiberContext, FiberStatus, SafetyLevel};
4use core::future::Future;
5use core::pin::Pin;
6pub use topology::Affinity;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum Priority {
11 Low,
13 Normal,
15 High,
17 Critical,
19}
20
21pub trait ContextSwitcher: Send + Sync + 'static {
23 const SWITCH_FN: unsafe extern "C" fn(
25 *mut crate::memory_management::Registers,
26 *const crate::memory_management::Registers,
27 );
28}
29
30pub struct CrossThreadFloat;
32impl ContextSwitcher for CrossThreadFloat {
33 const SWITCH_FN: unsafe extern "C" fn(
34 *mut crate::memory_management::Registers,
35 *const crate::memory_management::Registers,
36 ) = crate::context_switch::switch_context_cross_thread_float;
37}
38
39pub struct CrossThreadNoFloat;
41impl ContextSwitcher for CrossThreadNoFloat {
42 const SWITCH_FN: unsafe extern "C" fn(
43 *mut crate::memory_management::Registers,
44 *const crate::memory_management::Registers,
45 ) = crate::context_switch::switch_context_cross_thread_no_float;
46}
47
48pub struct SameThreadFloat;
50impl ContextSwitcher for SameThreadFloat {
51 const SWITCH_FN: unsafe extern "C" fn(
52 *mut crate::memory_management::Registers,
53 *const crate::memory_management::Registers,
54 ) = crate::context_switch::switch_context_same_thread_float;
55}
56
57pub struct SameThreadNoFloat;
59impl ContextSwitcher for SameThreadNoFloat {
60 const SWITCH_FN: unsafe extern "C" fn(
61 *mut crate::memory_management::Registers,
62 *const crate::memory_management::Registers,
63 ) = crate::context_switch::switch_context_same_thread_no_float;
64}
65
66pub struct SpawnBuilder<S: ContextSwitcher = CrossThreadFloat> {
68 name: Option<&'static str>,
69 affinity: topology::Affinity,
70 priority: Priority,
71 kind: WorkloadKind,
72 mode: TopologyMode,
73 safety: crate::memory_management::SafetyLevel,
74 _marker: core::marker::PhantomData<S>,
75}
76
77impl<S: ContextSwitcher> Default for SpawnBuilder<S> {
78 #[inline(always)]
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl<S: ContextSwitcher> SpawnBuilder<S> {
85 #[inline(always)]
88 #[must_use]
89 pub const fn new() -> Self {
90 Self {
91 name: None,
92 affinity: topology::Affinity::SameCore,
93 priority: Priority::Normal,
94 kind: WorkloadKind::Compute,
95 mode: TopologyMode::P2PMesh,
96 safety: crate::memory_management::SafetyLevel::Safety0,
97 _marker: core::marker::PhantomData,
98 }
99 }
100
101 #[inline(always)]
103 #[must_use]
104 pub const fn kind(mut self, kind: WorkloadKind) -> Self {
105 self.kind = kind;
106 self
107 }
108
109 #[inline(always)]
111 #[must_use]
112 pub const fn topology_mode(mut self, mode: TopologyMode) -> Self {
113 self.mode = mode;
114 self
115 }
116
117 #[inline(always)]
119 #[must_use]
120 pub const fn safety(mut self, safety: crate::memory_management::SafetyLevel) -> Self {
121 self.safety = safety;
122 self
123 }
124
125 #[inline(always)]
127 #[must_use]
128 pub const fn name(mut self, name: &'static str) -> Self {
129 self.name = Some(name);
130 self
131 }
132
133 #[inline(always)]
135 #[must_use]
136 pub const fn affinity(mut self, affinity: topology::Affinity) -> Self {
137 self.affinity = affinity;
138 self
139 }
140
141 #[inline(always)]
143 #[must_use]
144 pub const fn priority(mut self, priority: Priority) -> Self {
145 self.priority = priority;
146 self
147 }
148
149 #[inline(always)]
151 #[must_use]
152 pub const fn switcher<NewS: ContextSwitcher>(self) -> SpawnBuilder<NewS> {
153 SpawnBuilder {
154 name: self.name,
155 affinity: self.affinity,
156 priority: self.priority,
157 kind: self.kind,
158 mode: self.mode,
159 safety: self.safety,
160 _marker: core::marker::PhantomData,
161 }
162 }
163
164 #[inline(always)]
175 #[allow(clippy::cast_possible_truncation)]
176 #[allow(clippy::useless_let_if_seq)]
177 #[allow(clippy::too_many_lines)]
178 pub fn spawn<F: Future + Send + 'static>(self, fut: F) -> dtact_handle_t {
179 let runtime = crate::GLOBAL_RUNTIME
180 .get()
181 .expect("Dtact Runtime not initialized");
182 let pool = &runtime.pool;
183 let mut fixed_spins: u32 = 0;
184
185 let ctx_id = 'alloc: loop {
186 if let Some(id) = pool.alloc_context() {
187 let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
189 if !ctx_ptr.is_null() {
190 unsafe {
191 let ctx = &mut *ctx_ptr;
192 ctx.adaptive_spin_count = (ctx.adaptive_spin_count + 1).min(2000);
193 ctx.spin_failure_count = ctx.spin_failure_count.saturating_sub(1);
194 }
195 }
196 break 'alloc id;
197 }
198
199 let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
200 if ctx_ptr.is_null() {
201 if fixed_spins < 2000 {
203 core::hint::spin_loop();
204 fixed_spins += 1;
205
206 if fixed_spins.trailing_zeros() >= 3
208 && let Some(id) = pool.alloc_context()
209 {
210 break 'alloc id;
211 }
212 } else {
213 std::thread::yield_now();
214 fixed_spins = 0; }
216 } else {
217 unsafe {
219 let ctx = &mut *ctx_ptr;
220 let current_spin = ctx.adaptive_spin_count;
221 let failure_count = ctx.spin_failure_count;
222
223 if failure_count < 20 {
225 for i in 0..current_spin {
226 core::hint::spin_loop();
227
228 if i.trailing_zeros() >= 3
230 && let Some(id) = pool.alloc_context()
231 {
232 ctx.adaptive_spin_count = (current_spin + 2).min(2000);
233 ctx.spin_failure_count = failure_count.saturating_sub(1);
234 break 'alloc id;
235 }
236 }
237 }
238
239 ctx.spin_failure_count = failure_count.saturating_add(1);
241 ctx.adaptive_spin_count = current_spin.saturating_sub(100).max(200);
242
243 ctx.state.store(
244 crate::memory_management::FiberStatus::Notified as u8,
245 core::sync::atomic::Ordering::Release,
246 );
247 (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
248 }
249 }
250 };
251
252 let ctx_ptr = pool.get_context_ptr(ctx_id);
253 let current_core = crate::future_bridge::CURRENT_WORKER_ID.with(|c| {
254 let id = c.get();
255 if id < runtime.scheduler.workers.len() {
256 id
257 } else {
258 topology::current().core_id as usize % runtime.scheduler.workers.len()
259 }
260 });
261
262 unsafe {
263 (*ctx_ptr).state.store(
264 crate::memory_management::FiberStatus::Running as u8,
265 core::sync::atomic::Ordering::Release,
266 );
267 (*ctx_ptr).kind = self.kind;
268 (*ctx_ptr).mode = self.mode;
269 (*ctx_ptr).origin_core = current_core as u16;
270 (*ctx_ptr).fiber_index = ctx_id;
271 (*ctx_ptr).switch_fn = S::SWITCH_FN;
272
273 (*ctx_ptr).adaptive_spin_count = match self.kind {
275 WorkloadKind::Compute => 1000,
276 WorkloadKind::IO => 100,
277 WorkloadKind::Memory => 500,
278 WorkloadKind::System => 200,
279 };
280
281 let align = core::mem::align_of::<F>();
283 let fut_size = core::mem::size_of::<F>();
284 let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
285 let buffer_end = buffer_start + 8192;
286 let aligned_fut_addr = (buffer_end - fut_size) & !(align - 1);
287
288 let stack_limit: usize;
291
292 if aligned_fut_addr < buffer_start || (aligned_fut_addr + fut_size) > buffer_end {
293 crate::HEAP_ESCAPED_SPAWNS.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
295
296 #[cfg(debug_assertions)]
297 {
298 static WARNED: core::sync::atomic::AtomicBool =
299 core::sync::atomic::AtomicBool::new(false);
300 if !WARNED.swap(true, core::sync::atomic::Ordering::Relaxed) {
301 eprintln!(
302 "DTA-V3 WARNING: Future exceeds or misaligns 8KB zero-copy buffer. Switching to heap-allocation mode."
303 );
304 }
305 }
306
307 let boxed = Box::new(fut);
308 let fut_ptr = Box::into_raw(boxed);
309 (*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
310 (*ctx_ptr).invoke_closure = |ptr| unsafe {
311 let mut f = Box::from_raw(ptr.cast::<F>());
312 let f_pinned = Pin::new_unchecked(&mut *f);
313 crate::future_bridge::wait_pinned(f_pinned);
314 };
315 (*ctx_ptr).cleanup_fn = None;
316
317 stack_limit = buffer_end;
319 } else {
320 let fut_ptr = aligned_fut_addr as *mut F;
321 core::ptr::write(fut_ptr, fut);
322
323 (*ctx_ptr).invoke_closure = |ptr| {
324 let f_ptr = ptr.cast::<F>();
325 unsafe {
326 let f_pinned = Pin::new_unchecked(&mut *f_ptr);
327 crate::future_bridge::wait_pinned(f_pinned);
328 core::ptr::drop_in_place(f_ptr);
329 }
330 };
331 (*ctx_ptr).closure_ptr = fut_ptr.cast::<()>();
332
333 stack_limit = aligned_fut_addr;
335 }
336
337 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
339 let stack_top = (stack_limit & !0xF) - 8;
340 #[cfg(not(any(target_arch = "x86", target_arch = "x86_64")))]
341 let stack_top = stack_limit & !0xF;
342
343 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
344 let stack_top_ptr = stack_top as *mut u64;
345
346 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
349 core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
350
351 let stack_top = stack_top as *mut u8;
352
353 #[cfg(target_arch = "x86_64")]
354 {
355 (*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = fiber_entry_point as *const () as u64; #[cfg(windows)]
358 {
359 (*ctx_ptr).regs.gprs[10] = stack_limit as u64; (*ctx_ptr).regs.gprs[11] = buffer_start as u64; (*ctx_ptr).regs.gprs[12] = buffer_start as u64; (*ctx_ptr).regs.gprs[13] = !0; }
364 }
365 #[cfg(target_arch = "aarch64")]
366 {
367 (*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = fiber_entry_point as *const () as u64; #[cfg(windows)]
370 {
371 (*ctx_ptr).regs.gprs[13] = stack_limit as u64; (*ctx_ptr).regs.gprs[14] = buffer_start as u64; (*ctx_ptr).regs.gprs[15] = buffer_start as u64; }
375 }
376 #[cfg(target_arch = "riscv64")]
377 {
378 (*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = fiber_entry_point as *const () as u64; }
381 }
382
383 let r#gen = u64::from(unsafe {
384 (*ctx_ptr)
385 .generation
386 .load(core::sync::atomic::Ordering::Acquire)
387 });
388
389 crate::wake_fiber(current_core, ctx_id);
390
391 dtact_handle_t(
393 u64::from(ctx_id)
394 | ((current_core as u64) << 32)
395 | ((r#gen & 0xFFFF) << 48)
396 | (1 << 63),
397 )
398 }
399}
400
401pub(crate) unsafe extern "C" fn fiber_entry_point() {
402 let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
403 if ctx_ptr.is_null() {
404 return;
405 }
406
407 let ctx = unsafe { &mut *ctx_ptr };
408 let invoke = ctx.invoke_closure;
409 let arg = ctx.closure_ptr;
410
411 let _ = std::panic::catch_unwind(core::panic::AssertUnwindSafe(move || {
413 unsafe { invoke(arg) };
414 }));
415
416 if let Some(cleanup) = ctx.cleanup_fn.take() {
418 unsafe { cleanup(ctx.closure_ptr) };
419 }
420
421 ctx.state.store(
424 crate::memory_management::FiberStatus::Finished as u8,
425 core::sync::atomic::Ordering::Release,
426 );
427
428 let waiter = ctx
432 .waiter_handle
433 .swap(0, core::sync::atomic::Ordering::AcqRel);
434 if waiter != 0 {
435 let waiter = waiter & !(1 << 63); let waiter_ctx_id = (waiter & 0xFFFF_FFFF) as u32;
437 let target_worker = (waiter >> 32) as usize;
438
439 if let Some(runtime) = crate::GLOBAL_RUNTIME.get() {
440 let num_workers = runtime.scheduler.workers.len();
441 let target_worker = target_worker % num_workers;
442 let current_worker = crate::future_bridge::CURRENT_WORKER_ID.with(std::cell::Cell::get);
443
444 if current_worker == target_worker {
445 unsafe {
447 let worker = &mut *runtime.scheduler.workers[target_worker].get();
448 worker.push_local(waiter_ctx_id);
449 }
450 } else if current_worker < num_workers {
451 let mut chunk = crate::dta_scheduler::TaskChunk::default();
453 chunk.tasks[0] = waiter_ctx_id;
454 chunk.count = 1;
455 let _ = runtime.scheduler.mailboxes[current_worker][target_worker].push(chunk);
456 } else {
457 let _ = runtime.scheduler.enqueue_task(
459 target_worker,
460 u64::from(waiter_ctx_id),
461 waiter_ctx_id,
462 );
463 }
464 }
465 }
466
467 unsafe { crate::utils::futex_wake(&raw const ctx.state) };
469
470 unsafe {
473 (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
474 }
475}
476
477pub static TOPOLOGY_EPOCH: core::sync::atomic::AtomicU64 = core::sync::atomic::AtomicU64::new(0);
480
481pub mod topology {
483 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
485 pub enum Affinity {
486 SameCore,
488 SameCCX,
490 SameNUMA,
492 Any,
494 }
495
496 #[inline(always)]
498 #[must_use]
499 pub fn current_core() -> u16 {
500 current().core_id
501 }
502
503 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
505 pub struct CpuLevel {
506 pub core_id: u16,
508 pub ccx_id: u16,
510 pub numa_id: u16,
512 }
513
514 #[inline(always)]
519 pub fn current() -> CpuLevel {
520 thread_local! {
521 static CACHED: core::cell::Cell<(CpuLevel, u64)> = const {
522 core::cell::Cell::new((CpuLevel { core_id: 0, ccx_id: 0, numa_id: 0 }, 0))
523 };
524 }
525
526 let (mut cpu, mut last_refresh) = CACHED.with(std::cell::Cell::get);
527 let (now, cpu_id) = crate::utils::get_tick_with_cpu();
528
529 if now.wrapping_sub(last_refresh) > 100_000 || u32::from(cpu.core_id) != cpu_id {
531 let next_cpu = current_raw();
532 if next_cpu != cpu {
533 crate::TOPOLOGY_EPOCH.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
534 cpu = next_cpu;
535 }
536 last_refresh = now;
537 CACHED.with(|c| c.set((cpu, last_refresh)));
538 }
539 cpu
540 }
541
542 #[inline(always)]
544 #[must_use]
545 pub fn current_raw() -> CpuLevel {
546 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
547 {
548 let (x2apic_id, core_shift, package_shift): (u32, u32, u32);
549
550 unsafe {
551 let (mut eax, mut edx_v): (u32, u32);
552 core::arch::asm!(
553 "push rbx",
554 "cpuid",
555 "mov {ebx_out:e}, ebx",
556 "pop rbx",
557 ebx_out = out(reg) _,
558 inout("eax") 0x0B => eax,
559 inout("ecx") 0 => _,
560 out("edx") edx_v,
561 );
562 core_shift = eax;
563 x2apic_id = edx_v;
564
565 let eax_p: u32;
566 core::arch::asm!(
567 "push rbx",
568 "cpuid",
569 "mov {ebx_out:e}, ebx",
570 "pop rbx",
571 ebx_out = out(reg) _,
572 inout("eax") 0x0B => eax_p,
573 inout("ecx") 1 => _,
574 out("edx") _,
575 );
576 package_shift = eax_p;
577 }
578
579 let core_id = x2apic_id & ((1 << core_shift) - 1);
580 let ccx_id = (x2apic_id >> core_shift) & ((1 << (package_shift - core_shift)) - 1);
581 let numa_id = x2apic_id >> package_shift;
582
583 CpuLevel {
584 core_id: (core_id & 0xFFFF) as u16,
585 ccx_id: (ccx_id & 0xFFFF) as u16,
586 numa_id: (numa_id & 0xFFFF) as u16,
587 }
588 }
589
590 #[cfg(target_arch = "aarch64")]
591 {
592 let mut mpidr: u64;
593 unsafe {
594 core::arch::asm!("mrs {}, mpidr_el1", out(reg) mpidr, options(nomem, nostack, preserves_flags));
595 }
596 return CpuLevel {
597 core_id: (mpidr & 0xFF) as u16,
598 ccx_id: ((mpidr >> 8) & 0xFF) as u16,
599 numa_id: ((mpidr >> 16) & 0xFF) as u16,
600 };
601 }
602
603 #[cfg(target_arch = "riscv64")]
604 {
605 let mut hart_id: u64;
606 unsafe {
607 core::arch::asm!("csrr {}, mhartid", out(reg) hart_id, options(nomem, nostack, preserves_flags));
608 }
609 return CpuLevel {
610 core_id: (hart_id & 0xFFFF) as u16,
611 ccx_id: (hart_id >> 16) as u16,
612 numa_id: 0,
613 };
614 }
615
616 #[cfg(not(any(
617 target_arch = "x86",
618 target_arch = "x86_64",
619 target_arch = "aarch64",
620 target_arch = "riscv64"
621 )))]
622 {
623 CpuLevel {
624 core_id: 0,
625 ccx_id: 0,
626 numa_id: 0,
627 }
628 }
629 }
630}
631
632#[inline(always)]
634pub fn spawn<F: Future + Send + 'static>(fut: F) -> dtact_handle_t {
635 SpawnBuilder::<CrossThreadFloat>::new().spawn(fut)
636}
637
638#[inline(always)]
640#[must_use]
641pub const fn spawn_with() -> SpawnBuilder<CrossThreadFloat> {
642 SpawnBuilder::new()
643}
644
645#[doc(hidden)]
647pub mod spawn {
648 use super::{CrossThreadFloat, SpawnBuilder};
649 #[inline(always)]
651 #[must_use]
652 #[doc(hidden)]
653 pub const fn builder() -> SpawnBuilder<CrossThreadFloat> {
654 SpawnBuilder::new()
655 }
656}
657
658pub mod fiber {
660 use super::{dtact_handle_t, topology};
661 #[inline]
667 #[allow(clippy::cast_possible_truncation)]
668 pub fn spawn_with_stack<F: FnOnce() + Send + 'static>(
669 _stack_size_str: &str,
670 f: F,
671 ) -> dtact_handle_t {
672 let runtime = crate::GLOBAL_RUNTIME
673 .get()
674 .expect("Dtact Runtime not initialized");
675 let pool = &runtime.pool;
676 let ctx_id = pool.alloc_context().expect("Context pool exhausted - OOM");
677 let ctx_ptr = pool.get_context_ptr(ctx_id);
678 #[allow(clippy::cast_possible_truncation)]
679 let current_core = topology::current().core_id as usize;
680
681 unsafe {
682 (*ctx_ptr).state.store(
683 crate::memory_management::FiberStatus::Running as u8,
684 core::sync::atomic::Ordering::Release,
685 );
686 (*ctx_ptr).origin_core = current_core as u16;
687 (*ctx_ptr).fiber_index = ctx_id;
688 (*ctx_ptr).switch_fn = crate::context_switch::switch_context_same_thread_no_float;
689
690 let f_ptr = (*ctx_ptr).read_buffer_ptr.cast::<F>();
691 core::ptr::write(f_ptr, f);
692 (*ctx_ptr).invoke_closure = |ptr| {
693 let f = core::ptr::read(ptr.cast::<F>());
694 f();
695 };
696 (*ctx_ptr).closure_ptr = f_ptr.cast::<()>();
697
698 let buffer_start = (*ctx_ptr).read_buffer_ptr as usize;
700 let stack_top = (buffer_start & !0xF) - 72;
701 let stack_top_ptr = stack_top as *mut u64;
702
703 core::ptr::write(stack_top_ptr, crate::c_ffi::dtact_abort as *const () as u64);
705
706 let stack_top = stack_top as *mut u8;
707
708 #[cfg(target_arch = "x86_64")]
709 {
710 (*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[7] = super::fiber_entry_point as *const () as u64; #[cfg(windows)]
713 {
714 let limit = buffer_start.saturating_sub(pool.slot_size);
715 (*ctx_ptr).regs.gprs[10] = buffer_start as u64; (*ctx_ptr).regs.gprs[11] = limit as u64; (*ctx_ptr).regs.gprs[12] = limit as u64; (*ctx_ptr).regs.gprs[13] = !0; }
720 }
721 #[cfg(target_arch = "aarch64")]
722 {
723 (*ctx_ptr).regs.gprs[12] = stack_top as u64; (*ctx_ptr).regs.gprs[11] = super::fiber_entry_point as *const () as u64; #[cfg(windows)]
726 {
727 let limit = buffer_start.saturating_sub(pool.slot_size);
728 (*ctx_ptr).regs.gprs[13] = buffer_start as u64; (*ctx_ptr).regs.gprs[14] = limit as u64; (*ctx_ptr).regs.gprs[15] = limit as u64; }
732 }
733 #[cfg(target_arch = "riscv64")]
734 {
735 (*ctx_ptr).regs.gprs[0] = stack_top as u64; (*ctx_ptr).regs.gprs[13] = super::fiber_entry_point as *const () as u64; }
738 }
739
740 crate::wake_fiber(current_core, ctx_id);
741 dtact_handle_t(u64::from(ctx_id) | ((current_core as u64) << 32))
742 }
743
744 #[inline(always)]
747 pub fn yield_to(handle: dtact_handle_t) {
748 let ctx_ptr = crate::future_bridge::CURRENT_FIBER.with(std::cell::Cell::get);
749 if ctx_ptr.is_null() {
750 return;
751 }
752
753 let target_ctx_id = (handle.0 & 0xFFFF_FFFF) as u32;
754 let target_core_id = ((handle.0 >> 32) & 0xFFFF) as usize;
755
756 crate::wake_fiber(target_core_id, target_ctx_id);
757
758 unsafe {
759 let ctx = &mut *ctx_ptr;
760 ctx.state.store(
761 crate::memory_management::FiberStatus::Suspending as u8,
762 core::sync::atomic::Ordering::Release,
763 );
764 (ctx.switch_fn)(&raw mut ctx.regs, &raw const ctx.executor_regs);
765 }
766 }
767}
768
769#[cfg(feature = "hw-acceleration")]
771pub mod hw {
772 #[inline(always)]
774 pub fn cldemote<T>(ptr: *const T) {
775 #[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
776 unsafe {
777 core::arch::asm!("cldemote [{}]", in(reg) ptr);
778 }
779 #[cfg(target_arch = "aarch64")]
780 unsafe {
781 core::arch::asm!("dc cvac, {}", in(reg) ptr);
782 }
783 #[cfg(target_arch = "riscv64")]
784 unsafe {
785 core::arch::asm!("cbo.clean 0({0})", in(reg) ptr);
786 }
787 }
788
789 #[inline(always)]
791 pub fn uintr_signal(target_cpu: usize) {
792 #[cfg(target_arch = "x86_64")]
793 unsafe {
794 core::arch::asm!(
795 "mov rax, {}",
796 ".byte 0xf3, 0x0f, 0xc7, 0xf0",
797 in(reg) target_cpu as u64,
798 out("rax") _,
799 options(nostack, preserves_flags),
800 );
801 }
802 #[cfg(target_arch = "aarch64")]
803 unsafe {
804 core::arch::asm!("sev", options(nostack, preserves_flags));
805 }
806 #[cfg(target_arch = "riscv64")]
807 unsafe {
808 core::arch::asm!("csrw uipi, {0}", in(reg) target_cpu);
809 }
810 }
811}
812
813#[inline(always)]
815pub async fn yield_now() {
816 struct YieldNow(bool);
817 impl Future for YieldNow {
818 type Output = ();
819 #[inline(always)]
820 fn poll(
821 mut self: core::pin::Pin<&mut Self>,
822 cx: &mut core::task::Context<'_>,
823 ) -> core::task::Poll<Self::Output> {
824 if self.0 {
825 core::task::Poll::Ready(())
826 } else {
827 self.0 = true;
828 cx.waker().wake_by_ref();
829 core::task::Poll::Pending
830 }
831 }
832 }
833 YieldNow(false).await;
834}
835
836#[inline(always)]
838pub async fn yield_to(handle: dtact_handle_t) {
839 let handle_val = handle.0 & !(1 << 63); let target_ctx_id = (handle_val & 0xFFFF_FFFF) as u32;
841 let target_core_id = ((handle_val >> 32) & 0xFFFF) as usize;
842 crate::wake_fiber(target_core_id, target_ctx_id);
843 yield_now().await;
844}
845
846pub mod config {
848 use core::sync::atomic::Ordering;
849 #[inline(always)]
851 pub fn set_deflection_threshold(core_id: usize, threshold: u8) {
852 if let Some(runtime) = crate::GLOBAL_RUNTIME.get()
853 && core_id < runtime.scheduler.workers.len()
854 {
855 unsafe {
856 let worker = &*runtime.scheduler.workers[core_id].get();
857 worker
858 .deflection_threshold
859 .store(threshold, Ordering::Release);
860 }
861 }
862 }
863}
864
865pub trait DtactWaitExt {
867 type Output;
869 fn wait(self) -> Self::Output;
871}
872
873impl<F: Future> DtactWaitExt for F {
874 type Output = F::Output;
875 #[inline(always)]
876 fn wait(self) -> Self::Output {
877 crate::future_bridge::wait(self)
878 }
879}