Skip to main content

photon_ring/
wait.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46    /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47    /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48    BusySpin,
49
50    /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51    /// between iterations. Yields the CPU pipeline to the SMT sibling
52    /// and reduces power consumption vs `BusySpin`.
53    YieldSpin,
54
55    /// Exponential backoff spin. Starts with bare spins, then escalates
56    /// to PAUSE-based spins with increasing delays. Good for consumers
57    /// that may be idle for extended periods without burning a full core.
58    BackoffSpin,
59
60    /// Three-phase escalation: bare spin for `spin_iters` iterations,
61    /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62    Adaptive {
63        /// Number of bare-spin iterations before escalating to PAUSE.
64        spin_iters: u32,
65        /// Number of PAUSE iterations before entering deep backoff.
66        yield_iters: u32,
67    },
68
69    /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70    ///
71    /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72    /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73    /// state until a write to the monitored cache line wakes it. Near-zero
74    /// power consumption with ~30 ns wakeup latency.
75    ///
76    /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77    /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78    ///
79    /// `addr` must point to the stamp field of the slot being waited on.
80    /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81    /// `AtomicU64` reference. For callers that cannot provide an address,
82    /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83    ///
84    /// # Safety (of direct construction)
85    ///
86    /// Constructing this variant directly with an arbitrary pointer can
87    /// cause hardware faults on WAITPKG-capable CPUs. Prefer the safe
88    /// constructor [`WaitStrategy::monitor_wait`].
89    MonitorWait {
90        /// Pointer to the memory location to monitor. Must remain valid
91        /// for the duration of the wait.
92        addr: *const u8,
93    },
94
95    /// Like `MonitorWait` but without an explicit address.
96    ///
97    /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
98    /// C0.1 state) for low-power waiting without address monitoring.
99    /// On aarch64: SEVL+WFE. On other x86: PAUSE.
100    MonitorWaitFallback,
101}
102
103impl WaitStrategy {
104    /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
105    /// an `AtomicU64` reference (typically a slot's stamp field).
106    #[inline]
107    pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
108        WaitStrategy::MonitorWait {
109            addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
110        }
111    }
112}
113
114impl Default for WaitStrategy {
115    fn default() -> Self {
116        WaitStrategy::Adaptive {
117            spin_iters: 64,
118            yield_iters: 64,
119        }
120    }
121}
122
123/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
124///
125/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
126#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
127#[inline]
128fn has_waitpkg() -> bool {
129    #[cfg(target_arch = "x86_64")]
130    {
131        // SAFETY: CPUID is always available on x86_64.
132        let result = unsafe { core::arch::x86_64::__cpuid_count(7, 0) };
133        result.ecx & (1 << 5) != 0
134    }
135    #[cfg(target_arch = "x86")]
136    {
137        let result = unsafe { core::arch::x86::__cpuid_count(7, 0) };
138        result.ecx & (1 << 5) != 0
139    }
140}
141
142/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
143/// (benign data race — worst case is redundant CPUID calls on first access).
144#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
145static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
146
147/// 0 = unknown, 1 = not supported, 2 = supported.
148#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
149#[inline]
150fn waitpkg_supported() -> bool {
151    let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
152    if cached != 0 {
153        return cached == 2;
154    }
155    let supported = has_waitpkg();
156    WAITPKG_SUPPORT.store(
157        if supported { 2 } else { 1 },
158        core::sync::atomic::Ordering::Relaxed,
159    );
160    supported
161}
162
163// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
164// These are encoded via raw bytes because stable Rust doesn't expose them
165// as intrinsics yet.
166//
167// UMONITOR: sets up address monitoring (F3 0F AE /6)
168// UMWAIT:   wait until store to monitored line or timeout (F2 0F AE /6)
169// TPAUSE:   timed pause without address monitoring (66 0F AE /6)
170//
171// EDX:EAX = absolute TSC deadline. The instruction exits when either:
172//   (a) a store hits the monitored cache line (UMWAIT only), or
173//   (b) TSC >= deadline, or
174//   (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
175//
176// We set the deadline ~100µs in the future — long enough to actually
177// enter a low-power state, short enough to bound worst-case latency
178// if the wakeup event is missed (e.g., the store happened between
179// UMONITOR and UMWAIT).
180#[cfg(target_arch = "x86_64")]
181mod umwait {
182    /// Read the TSC and return a deadline ~100µs in the future.
183    /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
184    #[inline(always)]
185    fn deadline_100us() -> (u32, u32) {
186        let tsc: u64;
187        unsafe {
188            core::arch::asm!(
189                "rdtsc",
190                out("eax") _,
191                out("edx") _,
192                // rdtsc writes eax and edx; we read the full 64-bit value
193                // via a combined read below.
194                options(nostack, preserves_flags),
195            );
196            // Re-read via the intrinsic for a clean 64-bit value.
197            tsc = core::arch::x86_64::_rdtsc();
198        }
199        let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
200        (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
201    }
202
203    /// Set up monitoring on the cache line containing `addr`.
204    /// The CPU will track writes to this line until UMWAIT is called.
205    #[inline(always)]
206    pub(super) unsafe fn umonitor(addr: *const u8) {
207        // UMONITOR rax: F3 0F AE /6 (with rax)
208        core::arch::asm!(
209            ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
210            in("rax") addr,
211            options(nostack, preserves_flags),
212        );
213    }
214
215    /// Wait for a write to the monitored address or timeout.
216    /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
217    /// Returns quickly (~30 ns) when the monitored cache line is written.
218    /// Deadline is set ~100µs in the future as a safety bound.
219    #[inline(always)]
220    pub(super) unsafe fn umwait(ctrl: u32) {
221        let (lo, hi) = deadline_100us();
222        // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
223        // edx:eax = absolute TSC deadline
224        core::arch::asm!(
225            ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
226            in("ecx") ctrl,
227            in("edx") hi,
228            in("eax") lo,
229            options(nostack, preserves_flags),
230        );
231    }
232
233    /// Timed pause without address monitoring. Enters C0.1 state
234    /// until the deadline (~100µs from now).
235    /// `ctrl` = 0 for C0.2, 1 for C0.1.
236    #[inline(always)]
237    pub(super) unsafe fn tpause(ctrl: u32) {
238        let (lo, hi) = deadline_100us();
239        // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
240        core::arch::asm!(
241            ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
242            in("ecx") ctrl,
243            in("edx") hi,
244            in("eax") lo,
245            options(nostack, preserves_flags),
246        );
247    }
248}
249
250// SAFETY: MonitorWait stores a raw pointer for the monitored address.
251// The pointer is only used during wait() and must remain valid for
252// the duration of the wait. Since WaitStrategy is passed by value to
253// recv_with() and wait() is called synchronously, this is safe as long
254// as the caller ensures the pointed-to memory outlives the wait.
255unsafe impl Send for WaitStrategy {}
256unsafe impl Sync for WaitStrategy {}
257
258impl WaitStrategy {
259    /// Execute one wait iteration. Called by `recv_with` on each loop when
260    /// `try_recv` returns `Empty`.
261    ///
262    /// `iter` is the zero-based iteration count since the last successful
263    /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
264    #[inline]
265    pub(crate) fn wait(&self, iter: u32) {
266        match self {
267            WaitStrategy::BusySpin => {
268                // No hint — pure busy loop. Fastest wakeup, highest power.
269            }
270            WaitStrategy::YieldSpin => {
271                // On aarch64: SEVL + WFE puts the core into a low-power
272                // state until a cache-line event wakes it. SEVL sets the
273                // local event register so the first WFE returns immediately
274                // (avoids unconditional blocking).
275                // On x86: PAUSE yields the pipeline to the SMT sibling.
276                #[cfg(target_arch = "aarch64")]
277                unsafe {
278                    core::arch::asm!("sevl", options(nomem, nostack));
279                    core::arch::asm!("wfe", options(nomem, nostack));
280                }
281                #[cfg(not(target_arch = "aarch64"))]
282                core::hint::spin_loop();
283            }
284            WaitStrategy::BackoffSpin => {
285                // Exponential backoff: more iterations as we wait longer.
286                // On aarch64: WFE sleeps until a cache-line event, making
287                // each iteration near-zero power. On x86: PAUSE yields the
288                // pipeline with ~140 cycle delay per iteration.
289                let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
290                for _ in 0..pauses {
291                    #[cfg(target_arch = "aarch64")]
292                    unsafe {
293                        core::arch::asm!("wfe", options(nomem, nostack));
294                    }
295                    #[cfg(not(target_arch = "aarch64"))]
296                    core::hint::spin_loop();
297                }
298            }
299            WaitStrategy::Adaptive {
300                spin_iters,
301                yield_iters,
302            } => {
303                if iter < *spin_iters {
304                    // Phase 1: bare spin — fastest wakeup.
305                } else if iter < spin_iters + yield_iters {
306                    // Phase 2: PAUSE-spin — yields pipeline.
307                    core::hint::spin_loop();
308                } else {
309                    // Phase 3: deep backoff — multiple PAUSE per iteration.
310                    for _ in 0..8 {
311                        core::hint::spin_loop();
312                    }
313                }
314            }
315            WaitStrategy::MonitorWait { addr } => {
316                // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
317                // power wakeup on cache-line write (~30 ns wakeup latency).
318                // Falls back to PAUSE on CPUs without WAITPKG.
319                #[cfg(target_arch = "x86_64")]
320                {
321                    if waitpkg_supported() {
322                        unsafe {
323                            umwait::umonitor(*addr);
324                            umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
325                        }
326                    } else {
327                        core::hint::spin_loop();
328                    }
329                }
330                // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
331                // monitors cache-line invalidation events).
332                #[cfg(target_arch = "aarch64")]
333                {
334                    let _ = addr;
335                    unsafe {
336                        core::arch::asm!("sevl", options(nomem, nostack));
337                        core::arch::asm!("wfe", options(nomem, nostack));
338                    }
339                }
340                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
341                {
342                    let _ = addr;
343                    core::hint::spin_loop();
344                }
345            }
346            WaitStrategy::MonitorWaitFallback => {
347                // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
348                // address monitoring — still saves power vs PAUSE.
349                // On aarch64: SEVL + WFE.
350                // Elsewhere: PAUSE.
351                #[cfg(target_arch = "x86_64")]
352                {
353                    if waitpkg_supported() {
354                        unsafe {
355                            umwait::tpause(1); // C0.1
356                        }
357                    } else {
358                        core::hint::spin_loop();
359                    }
360                }
361                #[cfg(target_arch = "aarch64")]
362                unsafe {
363                    core::arch::asm!("sevl", options(nomem, nostack));
364                    core::arch::asm!("wfe", options(nomem, nostack));
365                }
366                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
367                core::hint::spin_loop();
368            }
369        }
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn default_is_adaptive() {
379        let ws = WaitStrategy::default();
380        assert_eq!(
381            ws,
382            WaitStrategy::Adaptive {
383                spin_iters: 64,
384                yield_iters: 64,
385            }
386        );
387    }
388
389    #[test]
390    fn busy_spin_returns_immediately() {
391        let ws = WaitStrategy::BusySpin;
392        for i in 0..1000 {
393            ws.wait(i);
394        }
395    }
396
397    #[test]
398    fn yield_spin_returns() {
399        let ws = WaitStrategy::YieldSpin;
400        for i in 0..100 {
401            ws.wait(i);
402        }
403    }
404
405    #[test]
406    fn backoff_spin_returns() {
407        let ws = WaitStrategy::BackoffSpin;
408        for i in 0..20 {
409            ws.wait(i);
410        }
411    }
412
413    #[test]
414    fn adaptive_phases() {
415        let ws = WaitStrategy::Adaptive {
416            spin_iters: 4,
417            yield_iters: 4,
418        };
419        for i in 0..20 {
420            ws.wait(i);
421        }
422    }
423
424    #[test]
425    fn clone_and_copy() {
426        let ws = WaitStrategy::BusySpin;
427        let ws2 = ws;
428        #[allow(clippy::clone_on_copy)]
429        let ws3 = ws.clone();
430        assert_eq!(ws, ws2);
431        assert_eq!(ws, ws3);
432    }
433
434    #[test]
435    fn debug_format() {
436        use alloc::format;
437        let ws = WaitStrategy::BusySpin;
438        let s = format!("{ws:?}");
439        assert!(s.contains("BusySpin"));
440    }
441}