photon_ring/wait.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46 /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47 /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48 BusySpin,
49
50 /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51 /// between iterations. Yields the CPU pipeline to the SMT sibling
52 /// and reduces power consumption vs `BusySpin`.
53 YieldSpin,
54
55 /// Exponential backoff spin. Starts with bare spins, then escalates
56 /// to PAUSE-based spins with increasing delays. Good for consumers
57 /// that may be idle for extended periods without burning a full core.
58 BackoffSpin,
59
60 /// Three-phase escalation: bare spin for `spin_iters` iterations,
61 /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62 Adaptive {
63 /// Number of bare-spin iterations before escalating to PAUSE.
64 spin_iters: u32,
65 /// Number of PAUSE iterations before entering deep backoff.
66 yield_iters: u32,
67 },
68
69 /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70 ///
71 /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72 /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73 /// state until a write to the monitored cache line wakes it. Near-zero
74 /// power consumption with ~30 ns wakeup latency.
75 ///
76 /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77 /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78 ///
79 /// `addr` must point to the stamp field of the slot being waited on.
80 /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81 /// `AtomicU64` reference. For callers that cannot provide an address,
82 /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83 ///
84 /// # Safety (of direct construction)
85 ///
86 /// The pointer must remain valid for the duration of the wait.
87 /// Constructing this variant directly with a pointer to stack memory
88 /// or memory that may be deallocated is undefined behavior. Always
89 /// prefer the safe constructor [`WaitStrategy::monitor_wait()`].
90 MonitorWait {
91 /// Pointer to the memory location to monitor. Must remain valid
92 /// for the duration of the wait.
93 addr: *const u8,
94 },
95
96 /// Like `MonitorWait` but without an explicit address.
97 ///
98 /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
99 /// C0.1 state) for low-power waiting without address monitoring.
100 /// On aarch64: SEVL+WFE. On other x86: PAUSE.
101 MonitorWaitFallback,
102}
103
104impl WaitStrategy {
105 /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
106 /// an `AtomicU64` reference (typically a slot's stamp field).
107 #[inline]
108 pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
109 WaitStrategy::MonitorWait {
110 addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
111 }
112 }
113}
114
115impl Default for WaitStrategy {
116 fn default() -> Self {
117 WaitStrategy::Adaptive {
118 spin_iters: 64,
119 yield_iters: 64,
120 }
121 }
122}
123
124/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
125///
126/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
127#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
128#[inline]
129fn has_waitpkg() -> bool {
130 #[cfg(target_arch = "x86_64")]
131 {
132 let result = core::arch::x86_64::__cpuid_count(7, 0);
133 result.ecx & (1 << 5) != 0
134 }
135 #[cfg(target_arch = "x86")]
136 {
137 let result = core::arch::x86::__cpuid_count(7, 0);
138 result.ecx & (1 << 5) != 0
139 }
140}
141
142/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
143/// (benign data race — worst case is redundant CPUID calls on first access).
144#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
145static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
146
147/// 0 = unknown, 1 = not supported, 2 = supported.
148#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
149#[inline]
150fn waitpkg_supported() -> bool {
151 let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
152 if cached != 0 {
153 return cached == 2;
154 }
155 let supported = has_waitpkg();
156 WAITPKG_SUPPORT.store(
157 if supported { 2 } else { 1 },
158 core::sync::atomic::Ordering::Relaxed,
159 );
160 supported
161}
162
163// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
164// These are encoded via raw bytes because stable Rust doesn't expose them
165// as intrinsics yet.
166//
167// UMONITOR: sets up address monitoring (F3 0F AE /6)
168// UMWAIT: wait until store to monitored line or timeout (F2 0F AE /6)
169// TPAUSE: timed pause without address monitoring (66 0F AE /6)
170//
171// EDX:EAX = absolute TSC deadline. The instruction exits when either:
172// (a) a store hits the monitored cache line (UMWAIT only), or
173// (b) TSC >= deadline, or
174// (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
175//
176// We set the deadline ~100µs in the future — long enough to actually
177// enter a low-power state, short enough to bound worst-case latency
178// if the wakeup event is missed (e.g., the store happened between
179// UMONITOR and UMWAIT).
180#[cfg(target_arch = "x86_64")]
181mod umwait {
182 /// Read the TSC and return a deadline ~100µs in the future.
183 /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
184 ///
185 /// Note: The 300,000 cycle offset assumes ~3 GHz TSC frequency. On slower
186 /// CPUs (1 GHz), this becomes ~300 µs; on faster CPUs (5 GHz), ~60 µs.
187 /// The deadline is a safety bound, not a precision target.
188 #[inline(always)]
189 fn deadline_100us() -> (u32, u32) {
190 // Use _rdtsc() directly. The previous version had a dead inline-asm
191 // `rdtsc` block whose eax/edx outputs were discarded (bound to `_`),
192 // immediately followed by a second _rdtsc() call for the actual value.
193 // That wasted ~20–25 cycles per UMWAIT/TPAUSE call. Removed.
194 let tsc = unsafe { core::arch::x86_64::_rdtsc() };
195 let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
196 (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
197 }
198
199 /// Set up monitoring on the cache line containing `addr`.
200 /// The CPU will track writes to this line until UMWAIT is called.
201 #[inline(always)]
202 pub(super) unsafe fn umonitor(addr: *const u8) {
203 // UMONITOR rax: F3 0F AE /6 (with rax)
204 core::arch::asm!(
205 ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
206 in("rax") addr,
207 options(nostack, preserves_flags),
208 );
209 }
210
211 /// Wait for a write to the monitored address or timeout.
212 /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
213 /// Returns quickly (~30 ns) when the monitored cache line is written.
214 /// Deadline is set ~100µs in the future as a safety bound.
215 #[inline(always)]
216 pub(super) unsafe fn umwait(ctrl: u32) {
217 let (lo, hi) = deadline_100us();
218 // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
219 // edx:eax = absolute TSC deadline
220 core::arch::asm!(
221 ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
222 in("ecx") ctrl,
223 in("edx") hi,
224 in("eax") lo,
225 options(nostack, preserves_flags),
226 );
227 }
228
229 /// Timed pause without address monitoring. Enters C0.1 state
230 /// until the deadline (~100µs from now).
231 /// `ctrl` = 0 for C0.2, 1 for C0.1.
232 #[inline(always)]
233 pub(super) unsafe fn tpause(ctrl: u32) {
234 let (lo, hi) = deadline_100us();
235 // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
236 core::arch::asm!(
237 ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
238 in("ecx") ctrl,
239 in("edx") hi,
240 in("eax") lo,
241 options(nostack, preserves_flags),
242 );
243 }
244}
245
246// SAFETY: MonitorWait stores a raw pointer for the monitored address.
247// The pointer is only used during wait() and must remain valid for
248// the duration of the wait. Since WaitStrategy is passed by value to
249// recv_with() and wait() is called synchronously, this is safe as long
250// as the caller ensures the pointed-to memory outlives the wait.
251unsafe impl Send for WaitStrategy {}
252unsafe impl Sync for WaitStrategy {}
253
254impl WaitStrategy {
255 /// Execute one wait iteration. Called by `recv_with` on each loop when
256 /// `try_recv` returns `Empty`.
257 ///
258 /// `iter` is the zero-based iteration count since the last successful
259 /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
260 #[inline]
261 pub(crate) fn wait(&self, iter: u32) {
262 match self {
263 WaitStrategy::BusySpin => {
264 // No hint — pure busy loop. Fastest wakeup, highest power.
265 }
266 WaitStrategy::YieldSpin => {
267 // On aarch64: SEVL + WFE puts the core into a low-power
268 // state until a cache-line event wakes it. SEVL sets the
269 // local event register so the first WFE returns immediately
270 // (avoids unconditional blocking).
271 // On x86: PAUSE yields the pipeline to the SMT sibling.
272 #[cfg(target_arch = "aarch64")]
273 unsafe {
274 core::arch::asm!("sevl", options(nomem, nostack));
275 core::arch::asm!("wfe", options(nomem, nostack));
276 }
277 #[cfg(not(target_arch = "aarch64"))]
278 core::hint::spin_loop();
279 }
280 WaitStrategy::BackoffSpin => {
281 // Exponential backoff: more iterations as we wait longer.
282 // On aarch64: WFE sleeps until a cache-line event, making
283 // each iteration near-zero power. On x86: PAUSE yields the
284 // pipeline with ~140 cycle delay per iteration.
285 let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
286 for _ in 0..pauses {
287 #[cfg(target_arch = "aarch64")]
288 unsafe {
289 core::arch::asm!("wfe", options(nomem, nostack));
290 }
291 #[cfg(not(target_arch = "aarch64"))]
292 core::hint::spin_loop();
293 }
294 }
295 WaitStrategy::Adaptive {
296 spin_iters,
297 yield_iters,
298 } => {
299 if iter < *spin_iters {
300 // Phase 1: bare spin — fastest wakeup.
301 } else if iter < spin_iters + yield_iters {
302 // Phase 2: PAUSE-spin — yields pipeline.
303 core::hint::spin_loop();
304 } else {
305 // Phase 3: deep backoff — multiple PAUSE per iteration.
306 for _ in 0..8 {
307 core::hint::spin_loop();
308 }
309 }
310 }
311 WaitStrategy::MonitorWait { addr } => {
312 // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
313 // power wakeup on cache-line write (~30 ns wakeup latency).
314 // Falls back to PAUSE on CPUs without WAITPKG.
315 #[cfg(target_arch = "x86_64")]
316 {
317 if waitpkg_supported() {
318 unsafe {
319 umwait::umonitor(*addr);
320 umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
321 }
322 } else {
323 core::hint::spin_loop();
324 }
325 }
326 // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
327 // monitors cache-line invalidation events).
328 #[cfg(target_arch = "aarch64")]
329 {
330 let _ = addr;
331 unsafe {
332 core::arch::asm!("sevl", options(nomem, nostack));
333 core::arch::asm!("wfe", options(nomem, nostack));
334 }
335 }
336 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
337 {
338 let _ = addr;
339 core::hint::spin_loop();
340 }
341 }
342 WaitStrategy::MonitorWaitFallback => {
343 // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
344 // address monitoring — still saves power vs PAUSE.
345 // On aarch64: SEVL + WFE.
346 // Elsewhere: PAUSE.
347 #[cfg(target_arch = "x86_64")]
348 {
349 if waitpkg_supported() {
350 unsafe {
351 umwait::tpause(1); // C0.1
352 }
353 } else {
354 core::hint::spin_loop();
355 }
356 }
357 #[cfg(target_arch = "aarch64")]
358 unsafe {
359 core::arch::asm!("sevl", options(nomem, nostack));
360 core::arch::asm!("wfe", options(nomem, nostack));
361 }
362 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
363 core::hint::spin_loop();
364 }
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn default_is_adaptive() {
375 let ws = WaitStrategy::default();
376 assert_eq!(
377 ws,
378 WaitStrategy::Adaptive {
379 spin_iters: 64,
380 yield_iters: 64,
381 }
382 );
383 }
384
385 #[test]
386 fn busy_spin_returns_immediately() {
387 let ws = WaitStrategy::BusySpin;
388 for i in 0..1000 {
389 ws.wait(i);
390 }
391 }
392
393 #[test]
394 fn yield_spin_returns() {
395 let ws = WaitStrategy::YieldSpin;
396 for i in 0..100 {
397 ws.wait(i);
398 }
399 }
400
401 #[test]
402 fn backoff_spin_returns() {
403 let ws = WaitStrategy::BackoffSpin;
404 for i in 0..20 {
405 ws.wait(i);
406 }
407 }
408
409 #[test]
410 fn adaptive_phases() {
411 let ws = WaitStrategy::Adaptive {
412 spin_iters: 4,
413 yield_iters: 4,
414 };
415 for i in 0..20 {
416 ws.wait(i);
417 }
418 }
419
420 #[test]
421 fn clone_and_copy() {
422 let ws = WaitStrategy::BusySpin;
423 let ws2 = ws;
424 #[allow(clippy::clone_on_copy)]
425 let ws3 = ws.clone();
426 assert_eq!(ws, ws2);
427 assert_eq!(ws, ws3);
428 }
429
430 #[test]
431 fn debug_format() {
432 use alloc::format;
433 let ws = WaitStrategy::BusySpin;
434 let s = format!("{ws:?}");
435 assert!(s.contains("BusySpin"));
436 }
437}