photon_ring/wait.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46 /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47 /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48 BusySpin,
49
50 /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51 /// between iterations. Yields the CPU pipeline to the SMT sibling
52 /// and reduces power consumption vs `BusySpin`.
53 YieldSpin,
54
55 /// Exponential backoff spin. Starts with bare spins, then escalates
56 /// to PAUSE-based spins with increasing delays. Good for consumers
57 /// that may be idle for extended periods without burning a full core.
58 BackoffSpin,
59
60 /// Three-phase escalation: bare spin for `spin_iters` iterations,
61 /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62 Adaptive {
63 /// Number of bare-spin iterations before escalating to PAUSE.
64 spin_iters: u32,
65 /// Number of PAUSE iterations before entering deep backoff.
66 yield_iters: u32,
67 },
68
69 /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70 ///
71 /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72 /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73 /// state until a write to the monitored cache line wakes it. Near-zero
74 /// power consumption with ~30 ns wakeup latency.
75 ///
76 /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77 /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78 ///
79 /// `addr` must point to the stamp field of the slot being waited on.
80 /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81 /// `AtomicU64` reference. For callers that cannot provide an address,
82 /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83 ///
84 /// # Safety (of direct construction)
85 ///
86 /// Constructing this variant directly with an arbitrary pointer can
87 /// cause hardware faults on WAITPKG-capable CPUs. Prefer the safe
88 /// constructor [`WaitStrategy::monitor_wait`].
89 MonitorWait {
90 /// Pointer to the memory location to monitor. Must remain valid
91 /// for the duration of the wait.
92 addr: *const u8,
93 },
94
95 /// Like `MonitorWait` but without an explicit address.
96 ///
97 /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
98 /// C0.1 state) for low-power waiting without address monitoring.
99 /// On aarch64: SEVL+WFE. On other x86: PAUSE.
100 MonitorWaitFallback,
101}
102
103impl WaitStrategy {
104 /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
105 /// an `AtomicU64` reference (typically a slot's stamp field).
106 #[inline]
107 pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
108 WaitStrategy::MonitorWait {
109 addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
110 }
111 }
112}
113
114impl Default for WaitStrategy {
115 fn default() -> Self {
116 WaitStrategy::Adaptive {
117 spin_iters: 64,
118 yield_iters: 64,
119 }
120 }
121}
122
123/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
124///
125/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
126#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
127#[inline]
128fn has_waitpkg() -> bool {
129 #[cfg(target_arch = "x86_64")]
130 {
131 // SAFETY: CPUID is always available on x86_64.
132 let result = unsafe { core::arch::x86_64::__cpuid_count(7, 0) };
133 result.ecx & (1 << 5) != 0
134 }
135 #[cfg(target_arch = "x86")]
136 {
137 let result = unsafe { core::arch::x86::__cpuid_count(7, 0) };
138 result.ecx & (1 << 5) != 0
139 }
140}
141
142/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
143/// (benign data race — worst case is redundant CPUID calls on first access).
144#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
145static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
146
147/// 0 = unknown, 1 = not supported, 2 = supported.
148#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
149#[inline]
150fn waitpkg_supported() -> bool {
151 let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
152 if cached != 0 {
153 return cached == 2;
154 }
155 let supported = has_waitpkg();
156 WAITPKG_SUPPORT.store(
157 if supported { 2 } else { 1 },
158 core::sync::atomic::Ordering::Relaxed,
159 );
160 supported
161}
162
163// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
164// These are encoded via raw bytes because stable Rust doesn't expose them
165// as intrinsics yet.
166//
167// UMONITOR: sets up address monitoring (F3 0F AE /6)
168// UMWAIT: wait until store to monitored line or timeout (F2 0F AE /6)
169// TPAUSE: timed pause without address monitoring (66 0F AE /6)
170//
171// EDX:EAX = absolute TSC deadline. The instruction exits when either:
172// (a) a store hits the monitored cache line (UMWAIT only), or
173// (b) TSC >= deadline, or
174// (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
175//
176// We set the deadline ~100µs in the future — long enough to actually
177// enter a low-power state, short enough to bound worst-case latency
178// if the wakeup event is missed (e.g., the store happened between
179// UMONITOR and UMWAIT).
180#[cfg(target_arch = "x86_64")]
181mod umwait {
182 /// Read the TSC and return a deadline ~100µs in the future.
183 /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
184 #[inline(always)]
185 fn deadline_100us() -> (u32, u32) {
186 let tsc: u64;
187 unsafe {
188 core::arch::asm!(
189 "rdtsc",
190 out("eax") _,
191 out("edx") _,
192 // rdtsc writes eax and edx; we read the full 64-bit value
193 // via a combined read below.
194 options(nostack, preserves_flags),
195 );
196 // Re-read via the intrinsic for a clean 64-bit value.
197 tsc = core::arch::x86_64::_rdtsc();
198 }
199 let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
200 (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
201 }
202
203 /// Set up monitoring on the cache line containing `addr`.
204 /// The CPU will track writes to this line until UMWAIT is called.
205 #[inline(always)]
206 pub(super) unsafe fn umonitor(addr: *const u8) {
207 // UMONITOR rax: F3 0F AE /6 (with rax)
208 core::arch::asm!(
209 ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
210 in("rax") addr,
211 options(nostack, preserves_flags),
212 );
213 }
214
215 /// Wait for a write to the monitored address or timeout.
216 /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
217 /// Returns quickly (~30 ns) when the monitored cache line is written.
218 /// Deadline is set ~100µs in the future as a safety bound.
219 #[inline(always)]
220 pub(super) unsafe fn umwait(ctrl: u32) {
221 let (lo, hi) = deadline_100us();
222 // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
223 // edx:eax = absolute TSC deadline
224 core::arch::asm!(
225 ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
226 in("ecx") ctrl,
227 in("edx") hi,
228 in("eax") lo,
229 options(nostack, preserves_flags),
230 );
231 }
232
233 /// Timed pause without address monitoring. Enters C0.1 state
234 /// until the deadline (~100µs from now).
235 /// `ctrl` = 0 for C0.2, 1 for C0.1.
236 #[inline(always)]
237 pub(super) unsafe fn tpause(ctrl: u32) {
238 let (lo, hi) = deadline_100us();
239 // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
240 core::arch::asm!(
241 ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
242 in("ecx") ctrl,
243 in("edx") hi,
244 in("eax") lo,
245 options(nostack, preserves_flags),
246 );
247 }
248}
249
250// SAFETY: MonitorWait stores a raw pointer for the monitored address.
251// The pointer is only used during wait() and must remain valid for
252// the duration of the wait. Since WaitStrategy is passed by value to
253// recv_with() and wait() is called synchronously, this is safe as long
254// as the caller ensures the pointed-to memory outlives the wait.
255unsafe impl Send for WaitStrategy {}
256unsafe impl Sync for WaitStrategy {}
257
258impl WaitStrategy {
259 /// Execute one wait iteration. Called by `recv_with` on each loop when
260 /// `try_recv` returns `Empty`.
261 ///
262 /// `iter` is the zero-based iteration count since the last successful
263 /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
264 #[inline]
265 pub(crate) fn wait(&self, iter: u32) {
266 match self {
267 WaitStrategy::BusySpin => {
268 // No hint — pure busy loop. Fastest wakeup, highest power.
269 }
270 WaitStrategy::YieldSpin => {
271 // On aarch64: SEVL + WFE puts the core into a low-power
272 // state until a cache-line event wakes it. SEVL sets the
273 // local event register so the first WFE returns immediately
274 // (avoids unconditional blocking).
275 // On x86: PAUSE yields the pipeline to the SMT sibling.
276 #[cfg(target_arch = "aarch64")]
277 unsafe {
278 core::arch::asm!("sevl", options(nomem, nostack));
279 core::arch::asm!("wfe", options(nomem, nostack));
280 }
281 #[cfg(not(target_arch = "aarch64"))]
282 core::hint::spin_loop();
283 }
284 WaitStrategy::BackoffSpin => {
285 // Exponential backoff: more iterations as we wait longer.
286 // On aarch64: WFE sleeps until a cache-line event, making
287 // each iteration near-zero power. On x86: PAUSE yields the
288 // pipeline with ~140 cycle delay per iteration.
289 let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
290 for _ in 0..pauses {
291 #[cfg(target_arch = "aarch64")]
292 unsafe {
293 core::arch::asm!("wfe", options(nomem, nostack));
294 }
295 #[cfg(not(target_arch = "aarch64"))]
296 core::hint::spin_loop();
297 }
298 }
299 WaitStrategy::Adaptive {
300 spin_iters,
301 yield_iters,
302 } => {
303 if iter < *spin_iters {
304 // Phase 1: bare spin — fastest wakeup.
305 } else if iter < spin_iters + yield_iters {
306 // Phase 2: PAUSE-spin — yields pipeline.
307 core::hint::spin_loop();
308 } else {
309 // Phase 3: deep backoff — multiple PAUSE per iteration.
310 for _ in 0..8 {
311 core::hint::spin_loop();
312 }
313 }
314 }
315 WaitStrategy::MonitorWait { addr } => {
316 // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
317 // power wakeup on cache-line write (~30 ns wakeup latency).
318 // Falls back to PAUSE on CPUs without WAITPKG.
319 #[cfg(target_arch = "x86_64")]
320 {
321 if waitpkg_supported() {
322 unsafe {
323 umwait::umonitor(*addr);
324 umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
325 }
326 } else {
327 core::hint::spin_loop();
328 }
329 }
330 // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
331 // monitors cache-line invalidation events).
332 #[cfg(target_arch = "aarch64")]
333 {
334 let _ = addr;
335 unsafe {
336 core::arch::asm!("sevl", options(nomem, nostack));
337 core::arch::asm!("wfe", options(nomem, nostack));
338 }
339 }
340 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
341 {
342 let _ = addr;
343 core::hint::spin_loop();
344 }
345 }
346 WaitStrategy::MonitorWaitFallback => {
347 // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
348 // address monitoring — still saves power vs PAUSE.
349 // On aarch64: SEVL + WFE.
350 // Elsewhere: PAUSE.
351 #[cfg(target_arch = "x86_64")]
352 {
353 if waitpkg_supported() {
354 unsafe {
355 umwait::tpause(1); // C0.1
356 }
357 } else {
358 core::hint::spin_loop();
359 }
360 }
361 #[cfg(target_arch = "aarch64")]
362 unsafe {
363 core::arch::asm!("sevl", options(nomem, nostack));
364 core::arch::asm!("wfe", options(nomem, nostack));
365 }
366 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
367 core::hint::spin_loop();
368 }
369 }
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn default_is_adaptive() {
379 let ws = WaitStrategy::default();
380 assert_eq!(
381 ws,
382 WaitStrategy::Adaptive {
383 spin_iters: 64,
384 yield_iters: 64,
385 }
386 );
387 }
388
389 #[test]
390 fn busy_spin_returns_immediately() {
391 let ws = WaitStrategy::BusySpin;
392 for i in 0..1000 {
393 ws.wait(i);
394 }
395 }
396
397 #[test]
398 fn yield_spin_returns() {
399 let ws = WaitStrategy::YieldSpin;
400 for i in 0..100 {
401 ws.wait(i);
402 }
403 }
404
405 #[test]
406 fn backoff_spin_returns() {
407 let ws = WaitStrategy::BackoffSpin;
408 for i in 0..20 {
409 ws.wait(i);
410 }
411 }
412
413 #[test]
414 fn adaptive_phases() {
415 let ws = WaitStrategy::Adaptive {
416 spin_iters: 4,
417 yield_iters: 4,
418 };
419 for i in 0..20 {
420 ws.wait(i);
421 }
422 }
423
424 #[test]
425 fn clone_and_copy() {
426 let ws = WaitStrategy::BusySpin;
427 let ws2 = ws;
428 #[allow(clippy::clone_on_copy)]
429 let ws3 = ws.clone();
430 assert_eq!(ws, ws2);
431 assert_eq!(ws, ws3);
432 }
433
434 #[test]
435 fn debug_format() {
436 use alloc::format;
437 let ws = WaitStrategy::BusySpin;
438 let s = format!("{ws:?}");
439 assert!(s.contains("BusySpin"));
440 }
441}