photon_ring/wait.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46 /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47 /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48 BusySpin,
49
50 /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51 /// between iterations. Yields the CPU pipeline to the SMT sibling
52 /// and reduces power consumption vs `BusySpin`.
53 YieldSpin,
54
55 /// Exponential backoff spin. Starts with bare spins, then escalates
56 /// to PAUSE-based spins with increasing delays. Good for consumers
57 /// that may be idle for extended periods without burning a full core.
58 BackoffSpin,
59
60 /// Three-phase escalation: bare spin for `spin_iters` iterations,
61 /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62 Adaptive {
63 /// Number of bare-spin iterations before escalating to PAUSE.
64 spin_iters: u32,
65 /// Number of PAUSE iterations before entering deep backoff.
66 yield_iters: u32,
67 },
68
69 /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70 ///
71 /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72 /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73 /// state until a write to the monitored cache line wakes it. Near-zero
74 /// power consumption with ~30 ns wakeup latency.
75 ///
76 /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77 /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78 ///
79 /// `addr` must point to the stamp field of the slot being waited on.
80 /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81 /// `AtomicU64` reference. For callers that cannot provide an address,
82 /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83 ///
84 /// # Safety (of direct construction)
85 ///
86 /// Constructing this variant directly with an arbitrary pointer can
87 /// cause hardware faults on WAITPKG-capable CPUs. Prefer the safe
88 /// constructor [`WaitStrategy::monitor_wait`].
89 MonitorWait {
90 /// Pointer to the memory location to monitor. Must remain valid
91 /// for the duration of the wait.
92 addr: *const u8,
93 },
94
95 /// Like `MonitorWait` but without an explicit address.
96 ///
97 /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
98 /// C0.1 state) for low-power waiting without address monitoring.
99 /// On aarch64: SEVL+WFE. On other x86: PAUSE.
100 MonitorWaitFallback,
101}
102
103impl WaitStrategy {
104 /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
105 /// an `AtomicU64` reference (typically a slot's stamp field).
106 #[inline]
107 pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
108 WaitStrategy::MonitorWait {
109 addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
110 }
111 }
112}
113
114impl Default for WaitStrategy {
115 fn default() -> Self {
116 WaitStrategy::Adaptive {
117 spin_iters: 64,
118 yield_iters: 64,
119 }
120 }
121}
122
123/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
124///
125/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
126#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
127#[inline]
128fn has_waitpkg() -> bool {
129 #[cfg(target_arch = "x86_64")]
130 {
131 let result = core::arch::x86_64::__cpuid_count(7, 0);
132 result.ecx & (1 << 5) != 0
133 }
134 #[cfg(target_arch = "x86")]
135 {
136 let result = core::arch::x86::__cpuid_count(7, 0);
137 result.ecx & (1 << 5) != 0
138 }
139}
140
141/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
142/// (benign data race — worst case is redundant CPUID calls on first access).
143#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
144static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
145
146/// 0 = unknown, 1 = not supported, 2 = supported.
147#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
148#[inline]
149fn waitpkg_supported() -> bool {
150 let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
151 if cached != 0 {
152 return cached == 2;
153 }
154 let supported = has_waitpkg();
155 WAITPKG_SUPPORT.store(
156 if supported { 2 } else { 1 },
157 core::sync::atomic::Ordering::Relaxed,
158 );
159 supported
160}
161
162// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
163// These are encoded via raw bytes because stable Rust doesn't expose them
164// as intrinsics yet.
165//
166// UMONITOR: sets up address monitoring (F3 0F AE /6)
167// UMWAIT: wait until store to monitored line or timeout (F2 0F AE /6)
168// TPAUSE: timed pause without address monitoring (66 0F AE /6)
169//
170// EDX:EAX = absolute TSC deadline. The instruction exits when either:
171// (a) a store hits the monitored cache line (UMWAIT only), or
172// (b) TSC >= deadline, or
173// (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
174//
175// We set the deadline ~100µs in the future — long enough to actually
176// enter a low-power state, short enough to bound worst-case latency
177// if the wakeup event is missed (e.g., the store happened between
178// UMONITOR and UMWAIT).
179#[cfg(target_arch = "x86_64")]
180mod umwait {
181 /// Read the TSC and return a deadline ~100µs in the future.
182 /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
183 #[inline(always)]
184 fn deadline_100us() -> (u32, u32) {
185 let tsc: u64;
186 unsafe {
187 core::arch::asm!(
188 "rdtsc",
189 out("eax") _,
190 out("edx") _,
191 // rdtsc writes eax and edx; we read the full 64-bit value
192 // via a combined read below.
193 options(nostack, preserves_flags),
194 );
195 // Re-read via the intrinsic for a clean 64-bit value.
196 tsc = core::arch::x86_64::_rdtsc();
197 }
198 let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
199 (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
200 }
201
202 /// Set up monitoring on the cache line containing `addr`.
203 /// The CPU will track writes to this line until UMWAIT is called.
204 #[inline(always)]
205 pub(super) unsafe fn umonitor(addr: *const u8) {
206 // UMONITOR rax: F3 0F AE /6 (with rax)
207 core::arch::asm!(
208 ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
209 in("rax") addr,
210 options(nostack, preserves_flags),
211 );
212 }
213
214 /// Wait for a write to the monitored address or timeout.
215 /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
216 /// Returns quickly (~30 ns) when the monitored cache line is written.
217 /// Deadline is set ~100µs in the future as a safety bound.
218 #[inline(always)]
219 pub(super) unsafe fn umwait(ctrl: u32) {
220 let (lo, hi) = deadline_100us();
221 // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
222 // edx:eax = absolute TSC deadline
223 core::arch::asm!(
224 ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
225 in("ecx") ctrl,
226 in("edx") hi,
227 in("eax") lo,
228 options(nostack, preserves_flags),
229 );
230 }
231
232 /// Timed pause without address monitoring. Enters C0.1 state
233 /// until the deadline (~100µs from now).
234 /// `ctrl` = 0 for C0.2, 1 for C0.1.
235 #[inline(always)]
236 pub(super) unsafe fn tpause(ctrl: u32) {
237 let (lo, hi) = deadline_100us();
238 // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
239 core::arch::asm!(
240 ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
241 in("ecx") ctrl,
242 in("edx") hi,
243 in("eax") lo,
244 options(nostack, preserves_flags),
245 );
246 }
247}
248
249// SAFETY: MonitorWait stores a raw pointer for the monitored address.
250// The pointer is only used during wait() and must remain valid for
251// the duration of the wait. Since WaitStrategy is passed by value to
252// recv_with() and wait() is called synchronously, this is safe as long
253// as the caller ensures the pointed-to memory outlives the wait.
254unsafe impl Send for WaitStrategy {}
255unsafe impl Sync for WaitStrategy {}
256
257impl WaitStrategy {
258 /// Execute one wait iteration. Called by `recv_with` on each loop when
259 /// `try_recv` returns `Empty`.
260 ///
261 /// `iter` is the zero-based iteration count since the last successful
262 /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
263 #[inline]
264 pub(crate) fn wait(&self, iter: u32) {
265 match self {
266 WaitStrategy::BusySpin => {
267 // No hint — pure busy loop. Fastest wakeup, highest power.
268 }
269 WaitStrategy::YieldSpin => {
270 // On aarch64: SEVL + WFE puts the core into a low-power
271 // state until a cache-line event wakes it. SEVL sets the
272 // local event register so the first WFE returns immediately
273 // (avoids unconditional blocking).
274 // On x86: PAUSE yields the pipeline to the SMT sibling.
275 #[cfg(target_arch = "aarch64")]
276 unsafe {
277 core::arch::asm!("sevl", options(nomem, nostack));
278 core::arch::asm!("wfe", options(nomem, nostack));
279 }
280 #[cfg(not(target_arch = "aarch64"))]
281 core::hint::spin_loop();
282 }
283 WaitStrategy::BackoffSpin => {
284 // Exponential backoff: more iterations as we wait longer.
285 // On aarch64: WFE sleeps until a cache-line event, making
286 // each iteration near-zero power. On x86: PAUSE yields the
287 // pipeline with ~140 cycle delay per iteration.
288 let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
289 for _ in 0..pauses {
290 #[cfg(target_arch = "aarch64")]
291 unsafe {
292 core::arch::asm!("wfe", options(nomem, nostack));
293 }
294 #[cfg(not(target_arch = "aarch64"))]
295 core::hint::spin_loop();
296 }
297 }
298 WaitStrategy::Adaptive {
299 spin_iters,
300 yield_iters,
301 } => {
302 if iter < *spin_iters {
303 // Phase 1: bare spin — fastest wakeup.
304 } else if iter < spin_iters + yield_iters {
305 // Phase 2: PAUSE-spin — yields pipeline.
306 core::hint::spin_loop();
307 } else {
308 // Phase 3: deep backoff — multiple PAUSE per iteration.
309 for _ in 0..8 {
310 core::hint::spin_loop();
311 }
312 }
313 }
314 WaitStrategy::MonitorWait { addr } => {
315 // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
316 // power wakeup on cache-line write (~30 ns wakeup latency).
317 // Falls back to PAUSE on CPUs without WAITPKG.
318 #[cfg(target_arch = "x86_64")]
319 {
320 if waitpkg_supported() {
321 unsafe {
322 umwait::umonitor(*addr);
323 umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
324 }
325 } else {
326 core::hint::spin_loop();
327 }
328 }
329 // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
330 // monitors cache-line invalidation events).
331 #[cfg(target_arch = "aarch64")]
332 {
333 let _ = addr;
334 unsafe {
335 core::arch::asm!("sevl", options(nomem, nostack));
336 core::arch::asm!("wfe", options(nomem, nostack));
337 }
338 }
339 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
340 {
341 let _ = addr;
342 core::hint::spin_loop();
343 }
344 }
345 WaitStrategy::MonitorWaitFallback => {
346 // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
347 // address monitoring — still saves power vs PAUSE.
348 // On aarch64: SEVL + WFE.
349 // Elsewhere: PAUSE.
350 #[cfg(target_arch = "x86_64")]
351 {
352 if waitpkg_supported() {
353 unsafe {
354 umwait::tpause(1); // C0.1
355 }
356 } else {
357 core::hint::spin_loop();
358 }
359 }
360 #[cfg(target_arch = "aarch64")]
361 unsafe {
362 core::arch::asm!("sevl", options(nomem, nostack));
363 core::arch::asm!("wfe", options(nomem, nostack));
364 }
365 #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
366 core::hint::spin_loop();
367 }
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn default_is_adaptive() {
378 let ws = WaitStrategy::default();
379 assert_eq!(
380 ws,
381 WaitStrategy::Adaptive {
382 spin_iters: 64,
383 yield_iters: 64,
384 }
385 );
386 }
387
388 #[test]
389 fn busy_spin_returns_immediately() {
390 let ws = WaitStrategy::BusySpin;
391 for i in 0..1000 {
392 ws.wait(i);
393 }
394 }
395
396 #[test]
397 fn yield_spin_returns() {
398 let ws = WaitStrategy::YieldSpin;
399 for i in 0..100 {
400 ws.wait(i);
401 }
402 }
403
404 #[test]
405 fn backoff_spin_returns() {
406 let ws = WaitStrategy::BackoffSpin;
407 for i in 0..20 {
408 ws.wait(i);
409 }
410 }
411
412 #[test]
413 fn adaptive_phases() {
414 let ws = WaitStrategy::Adaptive {
415 spin_iters: 4,
416 yield_iters: 4,
417 };
418 for i in 0..20 {
419 ws.wait(i);
420 }
421 }
422
423 #[test]
424 fn clone_and_copy() {
425 let ws = WaitStrategy::BusySpin;
426 let ws2 = ws;
427 #[allow(clippy::clone_on_copy)]
428 let ws3 = ws.clone();
429 assert_eq!(ws, ws2);
430 assert_eq!(ws, ws3);
431 }
432
433 #[test]
434 fn debug_format() {
435 use alloc::format;
436 let ws = WaitStrategy::BusySpin;
437 let s = format!("{ws:?}");
438 assert!(s.contains("BusySpin"));
439 }
440}