Skip to main content

photon_ring/
wait.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Wait strategies for blocking receive operations.
5//!
6//! [`WaitStrategy`] controls how a consumer thread waits when no message is
7//! available. All strategies are `no_std` compatible.
8//!
9//! | Strategy | Latency | CPU usage | Best for |
10//! |---|---|---|---|
11//! | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
12//! | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
13//! | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
14//! | `Adaptive` | Auto-scaling | Varies | General purpose |
15//!
16//! # Platform-specific optimizations
17//!
18//! On **aarch64**, `YieldSpin` and `BackoffSpin` use the `WFE` (Wait For
19//! Event) instruction instead of `core::hint::spin_loop()` (which maps to
20//! `YIELD`). `WFE` puts the core into a low-power state until an event —
21//! such as a cache line invalidation from the publisher's store — wakes it.
22//! The `SEVL` + `WFE` pattern is used: `SEVL` sets the local event register
23//! so the first `WFE` doesn't block unconditionally.
24//!
25//! On **x86/x86_64**, `core::hint::spin_loop()` emits `PAUSE`, which is the
26//! standard spin-wait hint (~140 cycles on Skylake+).
27//!
28// NOTE: Intel Tremont+ CPUs support UMWAIT/TPAUSE instructions for
29// user-mode cache line monitoring. These would allow near-zero latency
30// wakeup without burning CPU. Not yet implemented — requires CPUID
31// feature detection (WAITPKG) and is only available on recent Intel.
32
33/// Strategy for blocking `recv()` and `SubscriberGroup::recv()`.
34///
35/// All variants are `no_std` compatible — no OS thread primitives required.
36///
37/// | Strategy | Latency | CPU usage | Best for |
38/// |---|---|---|---|
39/// | `BusySpin` | Lowest (~0 ns wakeup) | 100% core | Dedicated, pinned cores |
40/// | `YieldSpin` | Low (~30 ns on x86) | High | Shared cores, SMT |
41/// | `BackoffSpin` | Medium (exponential) | Decreasing | Background consumers |
42/// | `Adaptive` | Auto-scaling | Varies | General purpose |
43/// | `MonitorWait` | Near-zero (~30 ns on Intel) | Near-zero | Intel Alder Lake+ |
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum WaitStrategy {
46    /// Pure busy-spin with no PAUSE instruction. Minimum wakeup latency
47    /// but consumes 100% of one CPU core. Use on dedicated, pinned cores.
48    BusySpin,
49
50    /// Spin with `core::hint::spin_loop()` (PAUSE on x86, YIELD on ARM)
51    /// between iterations. Yields the CPU pipeline to the SMT sibling
52    /// and reduces power consumption vs `BusySpin`.
53    YieldSpin,
54
55    /// Exponential backoff spin. Starts with bare spins, then escalates
56    /// to PAUSE-based spins with increasing delays. Good for consumers
57    /// that may be idle for extended periods without burning a full core.
58    BackoffSpin,
59
60    /// Three-phase escalation: bare spin for `spin_iters` iterations,
61    /// then PAUSE-spin for `yield_iters`, then repeated PAUSE bursts.
62    Adaptive {
63        /// Number of bare-spin iterations before escalating to PAUSE.
64        spin_iters: u32,
65        /// Number of PAUSE iterations before entering deep backoff.
66        yield_iters: u32,
67    },
68
69    /// UMONITOR/UMWAIT on Intel (Tremont+, Alder Lake+) or WFE on ARM.
70    ///
71    /// On x86_64 with WAITPKG support: `UMONITOR` sets up a monitored
72    /// address range, `UMWAIT` puts the core into an optimized C0.1/C0.2
73    /// state until a write to the monitored cache line wakes it. Near-zero
74    /// power consumption with ~30 ns wakeup latency.
75    ///
76    /// Falls back to `YieldSpin` on x86 CPUs without WAITPKG support.
77    /// On aarch64: uses SEVL+WFE (identical to `YieldSpin`).
78    ///
79    /// `addr` must point to the stamp field of the slot being waited on.
80    /// Use [`WaitStrategy::monitor_wait`] to construct safely from an
81    /// `AtomicU64` reference. For callers that cannot provide an address,
82    /// use [`MonitorWaitFallback`](WaitStrategy::MonitorWaitFallback).
83    ///
84    /// # Safety (of direct construction)
85    ///
86    /// The pointer must remain valid for the duration of the wait.
87    /// Constructing this variant directly with a pointer to stack memory
88    /// or memory that may be deallocated is undefined behavior. Always
89    /// prefer the safe constructor [`WaitStrategy::monitor_wait()`].
90    MonitorWait {
91        /// Pointer to the memory location to monitor. Must remain valid
92        /// for the duration of the wait.
93        addr: *const u8,
94    },
95
96    /// Like `MonitorWait` but without an explicit address.
97    ///
98    /// On x86_64 with WAITPKG: falls back to `TPAUSE` (timed wait in
99    /// C0.1 state) for low-power waiting without address monitoring.
100    /// On aarch64: SEVL+WFE. On other x86: PAUSE.
101    MonitorWaitFallback,
102}
103
104impl WaitStrategy {
105    /// Safely construct a [`MonitorWait`](WaitStrategy::MonitorWait) from
106    /// an `AtomicU64` reference (typically a slot's stamp field).
107    #[inline]
108    pub fn monitor_wait(stamp: &core::sync::atomic::AtomicU64) -> Self {
109        WaitStrategy::MonitorWait {
110            addr: stamp as *const core::sync::atomic::AtomicU64 as *const u8,
111        }
112    }
113}
114
115impl Default for WaitStrategy {
116    fn default() -> Self {
117        WaitStrategy::Adaptive {
118            spin_iters: 64,
119            yield_iters: 64,
120        }
121    }
122}
123
124/// Check at runtime whether the CPU supports WAITPKG (UMONITOR/UMWAIT/TPAUSE).
125///
126/// CPUID leaf 7, sub-leaf 0, ECX bit 5.
127#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
128#[inline]
129fn has_waitpkg() -> bool {
130    #[cfg(target_arch = "x86_64")]
131    {
132        let result = core::arch::x86_64::__cpuid_count(7, 0);
133        result.ecx & (1 << 5) != 0
134    }
135    #[cfg(target_arch = "x86")]
136    {
137        let result = core::arch::x86::__cpuid_count(7, 0);
138        result.ecx & (1 << 5) != 0
139    }
140}
141
142/// Cached WAITPKG support flag. Evaluated once via a racy init pattern
143/// (benign data race — worst case is redundant CPUID calls on first access).
144#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
145static WAITPKG_SUPPORT: core::sync::atomic::AtomicU8 = core::sync::atomic::AtomicU8::new(0);
146
147/// 0 = unknown, 1 = not supported, 2 = supported.
148#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
149#[inline]
150fn waitpkg_supported() -> bool {
151    let cached = WAITPKG_SUPPORT.load(core::sync::atomic::Ordering::Relaxed);
152    if cached != 0 {
153        return cached == 2;
154    }
155    let supported = has_waitpkg();
156    WAITPKG_SUPPORT.store(
157        if supported { 2 } else { 1 },
158        core::sync::atomic::Ordering::Relaxed,
159    );
160    supported
161}
162
163// SAFETY wrappers for UMONITOR/UMWAIT/TPAUSE instructions.
164// These are encoded via raw bytes because stable Rust doesn't expose them
165// as intrinsics yet.
166//
167// UMONITOR: sets up address monitoring (F3 0F AE /6)
168// UMWAIT:   wait until store to monitored line or timeout (F2 0F AE /6)
169// TPAUSE:   timed pause without address monitoring (66 0F AE /6)
170//
171// EDX:EAX = absolute TSC deadline. The instruction exits when either:
172//   (a) a store hits the monitored cache line (UMWAIT only), or
173//   (b) TSC >= deadline, or
174//   (c) an OS-configured timeout (IA32_UMWAIT_CONTROL MSR) fires.
175//
176// We set the deadline ~100µs in the future — long enough to actually
177// enter a low-power state, short enough to bound worst-case latency
178// if the wakeup event is missed (e.g., the store happened between
179// UMONITOR and UMWAIT).
180#[cfg(target_arch = "x86_64")]
181mod umwait {
182    /// Read the TSC and return a deadline ~100µs in the future.
183    /// On a 3 GHz CPU, 100µs ≈ 300,000 cycles.
184    ///
185    /// Note: The 300,000 cycle offset assumes ~3 GHz TSC frequency. On slower
186    /// CPUs (1 GHz), this becomes ~300 µs; on faster CPUs (5 GHz), ~60 µs.
187    /// The deadline is a safety bound, not a precision target.
188    #[inline(always)]
189    fn deadline_100us() -> (u32, u32) {
190        // Use _rdtsc() directly. The previous version had a dead inline-asm
191        // `rdtsc` block whose eax/edx outputs were discarded (bound to `_`),
192        // immediately followed by a second _rdtsc() call for the actual value.
193        // That wasted ~20–25 cycles per UMWAIT/TPAUSE call. Removed.
194        let tsc = unsafe { core::arch::x86_64::_rdtsc() };
195        let deadline = tsc.wrapping_add(300_000); // ~100µs at 3 GHz
196        (deadline as u32, (deadline >> 32) as u32) // (eax, edx)
197    }
198
199    /// Set up monitoring on the cache line containing `addr`.
200    /// The CPU will track writes to this line until UMWAIT is called.
201    #[inline(always)]
202    pub(super) unsafe fn umonitor(addr: *const u8) {
203        // UMONITOR rax: F3 0F AE /6 (with rax)
204        core::arch::asm!(
205            ".byte 0xf3, 0x0f, 0xae, 0xf0", // UMONITOR rax
206            in("rax") addr,
207            options(nostack, preserves_flags),
208        );
209    }
210
211    /// Wait for a write to the monitored address or timeout.
212    /// `ctrl` = 0 for C0.2 (deeper sleep), 1 for C0.1 (lighter sleep).
213    /// Returns quickly (~30 ns) when the monitored cache line is written.
214    /// Deadline is set ~100µs in the future as a safety bound.
215    #[inline(always)]
216    pub(super) unsafe fn umwait(ctrl: u32) {
217        let (lo, hi) = deadline_100us();
218        // UMWAIT ecx: F2 0F AE /6 (with ecx for control)
219        // edx:eax = absolute TSC deadline
220        core::arch::asm!(
221            ".byte 0xf2, 0x0f, 0xae, 0xf1", // UMWAIT ecx
222            in("ecx") ctrl,
223            in("edx") hi,
224            in("eax") lo,
225            options(nostack, preserves_flags),
226        );
227    }
228
229    /// Timed pause without address monitoring. Enters C0.1 state
230    /// until the deadline (~100µs from now).
231    /// `ctrl` = 0 for C0.2, 1 for C0.1.
232    #[inline(always)]
233    pub(super) unsafe fn tpause(ctrl: u32) {
234        let (lo, hi) = deadline_100us();
235        // TPAUSE ecx: 66 0F AE /6 (with ecx for control)
236        core::arch::asm!(
237            ".byte 0x66, 0x0f, 0xae, 0xf1", // TPAUSE ecx
238            in("ecx") ctrl,
239            in("edx") hi,
240            in("eax") lo,
241            options(nostack, preserves_flags),
242        );
243    }
244}
245
246// SAFETY: MonitorWait stores a raw pointer for the monitored address.
247// The pointer is only used during wait() and must remain valid for
248// the duration of the wait. Since WaitStrategy is passed by value to
249// recv_with() and wait() is called synchronously, this is safe as long
250// as the caller ensures the pointed-to memory outlives the wait.
251unsafe impl Send for WaitStrategy {}
252unsafe impl Sync for WaitStrategy {}
253
254impl WaitStrategy {
255    /// Execute one wait iteration. Called by `recv_with` on each loop when
256    /// `try_recv` returns `Empty`.
257    ///
258    /// `iter` is the zero-based iteration count since the last successful
259    /// receive — it drives phase transitions in `Adaptive` and `BackoffSpin`.
260    #[inline]
261    pub(crate) fn wait(&self, iter: u32) {
262        match self {
263            WaitStrategy::BusySpin => {
264                // No hint — pure busy loop. Fastest wakeup, highest power.
265            }
266            WaitStrategy::YieldSpin => {
267                // On aarch64: SEVL + WFE puts the core into a low-power
268                // state until a cache-line event wakes it. SEVL sets the
269                // local event register so the first WFE returns immediately
270                // (avoids unconditional blocking).
271                // On x86: PAUSE yields the pipeline to the SMT sibling.
272                #[cfg(target_arch = "aarch64")]
273                unsafe {
274                    core::arch::asm!("sevl", options(nomem, nostack));
275                    core::arch::asm!("wfe", options(nomem, nostack));
276                }
277                #[cfg(not(target_arch = "aarch64"))]
278                core::hint::spin_loop();
279            }
280            WaitStrategy::BackoffSpin => {
281                // Exponential backoff: more iterations as we wait longer.
282                // On aarch64: WFE sleeps until a cache-line event, making
283                // each iteration near-zero power. On x86: PAUSE yields the
284                // pipeline with ~140 cycle delay per iteration.
285                let pauses = 1u32.wrapping_shl(iter.min(6)); // 1, 2, 4, 8, 16, 32, 64
286                for _ in 0..pauses {
287                    #[cfg(target_arch = "aarch64")]
288                    unsafe {
289                        core::arch::asm!("wfe", options(nomem, nostack));
290                    }
291                    #[cfg(not(target_arch = "aarch64"))]
292                    core::hint::spin_loop();
293                }
294            }
295            WaitStrategy::Adaptive {
296                spin_iters,
297                yield_iters,
298            } => {
299                if iter < *spin_iters {
300                    // Phase 1: bare spin — fastest wakeup.
301                } else if iter < spin_iters + yield_iters {
302                    // Phase 2: PAUSE-spin — yields pipeline.
303                    core::hint::spin_loop();
304                } else {
305                    // Phase 3: deep backoff — multiple PAUSE per iteration.
306                    for _ in 0..8 {
307                        core::hint::spin_loop();
308                    }
309                }
310            }
311            WaitStrategy::MonitorWait { addr } => {
312                // On x86_64 with WAITPKG: UMONITOR + UMWAIT for near-zero
313                // power wakeup on cache-line write (~30 ns wakeup latency).
314                // Falls back to PAUSE on CPUs without WAITPKG.
315                #[cfg(target_arch = "x86_64")]
316                {
317                    if waitpkg_supported() {
318                        unsafe {
319                            umwait::umonitor(*addr);
320                            umwait::umwait(1); // C0.1 — lighter sleep, faster wakeup
321                        }
322                    } else {
323                        core::hint::spin_loop();
324                    }
325                }
326                // On aarch64: SEVL + WFE (same as YieldSpin — WFE already
327                // monitors cache-line invalidation events).
328                #[cfg(target_arch = "aarch64")]
329                {
330                    let _ = addr;
331                    unsafe {
332                        core::arch::asm!("sevl", options(nomem, nostack));
333                        core::arch::asm!("wfe", options(nomem, nostack));
334                    }
335                }
336                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
337                {
338                    let _ = addr;
339                    core::hint::spin_loop();
340                }
341            }
342            WaitStrategy::MonitorWaitFallback => {
343                // On x86_64 with WAITPKG: TPAUSE enters C0.1 without
344                // address monitoring — still saves power vs PAUSE.
345                // On aarch64: SEVL + WFE.
346                // Elsewhere: PAUSE.
347                #[cfg(target_arch = "x86_64")]
348                {
349                    if waitpkg_supported() {
350                        unsafe {
351                            umwait::tpause(1); // C0.1
352                        }
353                    } else {
354                        core::hint::spin_loop();
355                    }
356                }
357                #[cfg(target_arch = "aarch64")]
358                unsafe {
359                    core::arch::asm!("sevl", options(nomem, nostack));
360                    core::arch::asm!("wfe", options(nomem, nostack));
361                }
362                #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
363                core::hint::spin_loop();
364            }
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372
373    #[test]
374    fn default_is_adaptive() {
375        let ws = WaitStrategy::default();
376        assert_eq!(
377            ws,
378            WaitStrategy::Adaptive {
379                spin_iters: 64,
380                yield_iters: 64,
381            }
382        );
383    }
384
385    #[test]
386    fn busy_spin_returns_immediately() {
387        let ws = WaitStrategy::BusySpin;
388        for i in 0..1000 {
389            ws.wait(i);
390        }
391    }
392
393    #[test]
394    fn yield_spin_returns() {
395        let ws = WaitStrategy::YieldSpin;
396        for i in 0..100 {
397            ws.wait(i);
398        }
399    }
400
401    #[test]
402    fn backoff_spin_returns() {
403        let ws = WaitStrategy::BackoffSpin;
404        for i in 0..20 {
405            ws.wait(i);
406        }
407    }
408
409    #[test]
410    fn adaptive_phases() {
411        let ws = WaitStrategy::Adaptive {
412            spin_iters: 4,
413            yield_iters: 4,
414        };
415        for i in 0..20 {
416            ws.wait(i);
417        }
418    }
419
420    #[test]
421    fn clone_and_copy() {
422        let ws = WaitStrategy::BusySpin;
423        let ws2 = ws;
424        #[allow(clippy::clone_on_copy)]
425        let ws3 = ws.clone();
426        assert_eq!(ws, ws2);
427        assert_eq!(ws, ws3);
428    }
429
430    #[test]
431    fn debug_format() {
432        use alloc::format;
433        let ws = WaitStrategy::BusySpin;
434        let s = format!("{ws:?}");
435        assert!(s.contains("BusySpin"));
436    }
437}