Skip to main content

photon_ring/
wait.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46    /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47    /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48    BusySpin,
49
50    /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51    /// between iterations. Yields the CPU pipeline to the SMT sibling
52    /// and reduces power consumption vs `BusySpin`.
53    YieldSpin,
54
55    /// Exponential backoff spin. Starts with bare spins, then escalates
56    /// to PAUSE-based spins with increasing delays. Good for consumers
57    /// that may be idle for extended periods without burning a full core.
58    BackoffSpin,
59
60    /// Three-phase escalation: bare spin for `spin_iters` iterations,
61    /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62    Adaptive {
63        /// Number of bare-spin iterations before escalating to PAUSE.
64        spin_iters: u32,
65        /// Number of PAUSE iterations before entering deep backoff.
66        yield_iters: u32,
67    },
68
69    /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70    ///
71    /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72    /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73    /// state until a write to the monitored cache line wakes it. Near-zero
74    /// power consumption with ~30 ns wakeup latency.
75    ///
76    /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77    /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78    ///
79    /// `addr` must point to the stamp field of the slot being waited on.
80    /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81    /// `AtomicU64` reference. For callers that cannot provide an address,
82    /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83    ///
84    /// # Safety (of direct construction)
85    ///
86    /// Constructing this variant directly with an arbitrary pointer can
87    /// cause hardware faults on WAITPKG-capable CPUs. Prefer the safe
88    /// constructor [`WaitStrategy::monitor_wait`].
89    MonitorWait {
90        /// Pointer to the memory location to monitor. Must remain valid
91        /// for the duration of the wait.
92        addr: *const u8,
93    },
94
95    /// Like `MonitorWait` but without an explicit address.
96    ///
97    /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
98    /// C0.1 state) for low-power waiting without address monitoring.
99    /// On aarch64: SEVL+WFE. On other x86: PAUSE.
100    MonitorWaitFallback,
101}
102
103impl WaitStrategy {
104    /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
105    /// an `AtomicU64` reference (typically a slot's stamp field).
106    #[inline]
107    pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
108        WaitStrategy::MonitorWait {
109            addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
110        }
111    }
112}
113
114impl Default for WaitStrategy {
115    fn default() -> Self {
116        WaitStrategy::Adaptive {
117            spin_iters: 64,
118            yield_iters: 64,
119        }
120    }
121}
122
123/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
124///
125/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
126#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
127#[inline]
128fn has_waitpkg() -> bool {
129    #[cfg(target_arch = "x86_64")]
130    {
131        let result = core::arch::x86_64::__cpuid_count(7, 0);
132        result.ecx & (1 << 5) != 0
133    }
134    #[cfg(target_arch = "x86")]
135    {
136        let result = core::arch::x86::__cpuid_count(7, 0);
137        result.ecx & (1 << 5) != 0
138    }
139}
140
141/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
142/// (benign data race — worst case is redundant CPUID calls on first access).
143#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
144static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
145
146/// 0 = unknown, 1 = not supported, 2 = supported.
147#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
148#[inline]
149fn waitpkg_supported() -> bool {
150    let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
151    if cached != 0 {
152        return cached == 2;
153    }
154    let supported = has_waitpkg();
155    WAITPKG_SUPPORT.store(
156        if supported { 2 } else { 1 },
157        core::sync::atomic::Ordering::Relaxed,
158    );
159    supported
160}
161
162// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
163// These are encoded via raw bytes because stable Rust doesn't expose them
164// as intrinsics yet.
165//
166// UMONITOR: sets up address monitoring (F3 0F AE /6)
167// UMWAIT:   wait until store to monitored line or timeout (F2 0F AE /6)
168// TPAUSE:   timed pause without address monitoring (66 0F AE /6)
169//
170// EDX:EAX = absolute TSC deadline. The instruction exits when either:
171//   (a) a store hits the monitored cache line (UMWAIT only), or
172//   (b) TSC >= deadline, or
173//   (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
174//
175// We set the deadline ~100µs in the future — long enough to actually
176// enter a low-power state, short enough to bound worst-case latency
177// if the wakeup event is missed (e.g., the store happened between
178// UMONITOR and UMWAIT).
179#[cfg(target_arch = "x86_64")]
180mod umwait {
181    /// Read the TSC and return a deadline ~100µs in the future.
182    /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
183    #[inline(always)]
184    fn deadline_100us() -> (u32, u32) {
185        let tsc: u64;
186        unsafe {
187            core::arch::asm!(
188                "rdtsc",
189                out("eax") _,
190                out("edx") _,
191                // rdtsc writes eax and edx; we read the full 64-bit value
192                // via a combined read below.
193                options(nostack, preserves_flags),
194            );
195            // Re-read via the intrinsic for a clean 64-bit value.
196            tsc = core::arch::x86_64::_rdtsc();
197        }
198        let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
199        (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
200    }
201
202    /// Set up monitoring on the cache line containing `addr`.
203    /// The CPU will track writes to this line until UMWAIT is called.
204    #[inline(always)]
205    pub(super) unsafe fn umonitor(addr: *const u8) {
206        // UMONITOR rax: F3 0F AE /6 (with rax)
207        core::arch::asm!(
208            ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
209            in("rax") addr,
210            options(nostack, preserves_flags),
211        );
212    }
213
214    /// Wait for a write to the monitored address or timeout.
215    /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
216    /// Returns quickly (~30 ns) when the monitored cache line is written.
217    /// Deadline is set ~100µs in the future as a safety bound.
218    #[inline(always)]
219    pub(super) unsafe fn umwait(ctrl: u32) {
220        let (lo, hi) = deadline_100us();
221        // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
222        // edx:eax = absolute TSC deadline
223        core::arch::asm!(
224            ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
225            in("ecx") ctrl,
226            in("edx") hi,
227            in("eax") lo,
228            options(nostack, preserves_flags),
229        );
230    }
231
232    /// Timed pause without address monitoring. Enters C0.1 state
233    /// until the deadline (~100µs from now).
234    /// `ctrl` = 0 for C0.2, 1 for C0.1.
235    #[inline(always)]
236    pub(super) unsafe fn tpause(ctrl: u32) {
237        let (lo, hi) = deadline_100us();
238        // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
239        core::arch::asm!(
240            ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
241            in("ecx") ctrl,
242            in("edx") hi,
243            in("eax") lo,
244            options(nostack, preserves_flags),
245        );
246    }
247}
248
249// SAFETY: MonitorWait stores a raw pointer for the monitored address.
250// The pointer is only used during wait() and must remain valid for
251// the duration of the wait. Since WaitStrategy is passed by value to
252// recv_with() and wait() is called synchronously, this is safe as long
253// as the caller ensures the pointed-to memory outlives the wait.
254unsafe impl Send for WaitStrategy {}
255unsafe impl Sync for WaitStrategy {}
256
257impl WaitStrategy {
258    /// Execute one wait iteration. Called by `recv_with` on each loop when
259    /// `try_recv` returns `Empty`.
260    ///
261    /// `iter` is the zero-based iteration count since the last successful
262    /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
263    #[inline]
264    pub(crate) fn wait(&self, iter: u32) {
265        match self {
266            WaitStrategy::BusySpin => {
267                // No hint — pure busy loop. Fastest wakeup, highest power.
268            }
269            WaitStrategy::YieldSpin => {
270                // On aarch64: SEVL + WFE puts the core into a low-power
271                // state until a cache-line event wakes it. SEVL sets the
272                // local event register so the first WFE returns immediately
273                // (avoids unconditional blocking).
274                // On x86: PAUSE yields the pipeline to the SMT sibling.
275                #[cfg(target_arch = "aarch64")]
276                unsafe {
277                    core::arch::asm!("sevl", options(nomem, nostack));
278                    core::arch::asm!("wfe", options(nomem, nostack));
279                }
280                #[cfg(not(target_arch = "aarch64"))]
281                core::hint::spin_loop();
282            }
283            WaitStrategy::BackoffSpin => {
284                // Exponential backoff: more iterations as we wait longer.
285                // On aarch64: WFE sleeps until a cache-line event, making
286                // each iteration near-zero power. On x86: PAUSE yields the
287                // pipeline with ~140 cycle delay per iteration.
288                let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
289                for _ in 0..pauses {
290                    #[cfg(target_arch = "aarch64")]
291                    unsafe {
292                        core::arch::asm!("wfe", options(nomem, nostack));
293                    }
294                    #[cfg(not(target_arch = "aarch64"))]
295                    core::hint::spin_loop();
296                }
297            }
298            WaitStrategy::Adaptive {
299                spin_iters,
300                yield_iters,
301            } => {
302                if iter < *spin_iters {
303                    // Phase 1: bare spin — fastest wakeup.
304                } else if iter < spin_iters + yield_iters {
305                    // Phase 2: PAUSE-spin — yields pipeline.
306                    core::hint::spin_loop();
307                } else {
308                    // Phase 3: deep backoff — multiple PAUSE per iteration.
309                    for _ in 0..8 {
310                        core::hint::spin_loop();
311                    }
312                }
313            }
314            WaitStrategy::MonitorWait { addr } => {
315                // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
316                // power wakeup on cache-line write (~30 ns wakeup latency).
317                // Falls back to PAUSE on CPUs without WAITPKG.
318                #[cfg(target_arch = "x86_64")]
319                {
320                    if waitpkg_supported() {
321                        unsafe {
322                            umwait::umonitor(*addr);
323                            umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
324                        }
325                    } else {
326                        core::hint::spin_loop();
327                    }
328                }
329                // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
330                // monitors cache-line invalidation events).
331                #[cfg(target_arch = "aarch64")]
332                {
333                    let _ = addr;
334                    unsafe {
335                        core::arch::asm!("sevl", options(nomem, nostack));
336                        core::arch::asm!("wfe", options(nomem, nostack));
337                    }
338                }
339                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
340                {
341                    let _ = addr;
342                    core::hint::spin_loop();
343                }
344            }
345            WaitStrategy::MonitorWaitFallback => {
346                // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
347                // address monitoring — still saves power vs PAUSE.
348                // On aarch64: SEVL + WFE.
349                // Elsewhere: PAUSE.
350                #[cfg(target_arch = "x86_64")]
351                {
352                    if waitpkg_supported() {
353                        unsafe {
354                            umwait::tpause(1); // C0.1
355                        }
356                    } else {
357                        core::hint::spin_loop();
358                    }
359                }
360                #[cfg(target_arch = "aarch64")]
361                unsafe {
362                    core::arch::asm!("sevl", options(nomem, nostack));
363                    core::arch::asm!("wfe", options(nomem, nostack));
364                }
365                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
366                core::hint::spin_loop();
367            }
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn default_is_adaptive() {
378        let ws = WaitStrategy::default();
379        assert_eq!(
380            ws,
381            WaitStrategy::Adaptive {
382                spin_iters: 64,
383                yield_iters: 64,
384            }
385        );
386    }
387
388    #[test]
389    fn busy_spin_returns_immediately() {
390        let ws = WaitStrategy::BusySpin;
391        for i in 0..1000 {
392            ws.wait(i);
393        }
394    }
395
396    #[test]
397    fn yield_spin_returns() {
398        let ws = WaitStrategy::YieldSpin;
399        for i in 0..100 {
400            ws.wait(i);
401        }
402    }
403
404    #[test]
405    fn backoff_spin_returns() {
406        let ws = WaitStrategy::BackoffSpin;
407        for i in 0..20 {
408            ws.wait(i);
409        }
410    }
411
412    #[test]
413    fn adaptive_phases() {
414        let ws = WaitStrategy::Adaptive {
415            spin_iters: 4,
416            yield_iters: 4,
417        };
418        for i in 0..20 {
419            ws.wait(i);
420        }
421    }
422
423    #[test]
424    fn clone_and_copy() {
425        let ws = WaitStrategy::BusySpin;
426        let ws2 = ws;
427        #[allow(clippy::clone_on_copy)]
428        let ws3 = ws.clone();
429        assert_eq!(ws, ws2);
430        assert_eq!(ws, ws3);
431    }
432
433    #[test]
434    fn debug_format() {
435        use alloc::format;
436        let ws = WaitStrategy::BusySpin;
437        let s = format!("{ws:?}");
438        assert!(s.contains("BusySpin"));
439    }
440}