Skip to main content

ph_eventing/
seq_ring.rs

1//! Lock-free SPSC overwrite ring for high-rate telemetry in no-std contexts.
2//!
3//! # Overview
4//! - Single producer, single consumer.
5//! - Producer never blocks; new writes overwrite the oldest slots when the ring wraps.
6//! - Sequence numbers are monotonically increasing `u32`; `0` is reserved to mean "empty".
7//! - The consumer can drain in-order (`poll_one`/`poll_up_to`) or sample the newest value (`latest`).
8//! - If the consumer lags by more than `N`, it skips ahead and reports the number of dropped items.
9//!
10//! # Memory ordering
11//! The producer writes the value, publishes the per-slot sequence, then publishes the newest
12//! sequence. The consumer validates the per-slot sequence before and after reading, which avoids
13//! torn reads when the producer overwrites a slot.
14//!
15//! # Notes
16//! - `T` is `Copy` to allow returning values by copy without allocation.
17//! - The `&T` passed to hooks is a reference to a local copy made during the read.
18
19use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
23#[cfg(test)]
24use core::sync::atomic::AtomicUsize;
25
26fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
27    core::array::from_fn(|_| AtomicU32::new(init))
28}
29
30fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
31    core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
32}
33
34#[cfg(test)]
35static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
36#[cfg(test)]
37static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
38
39#[must_use]
40#[derive(Copy, Clone, Debug)]
41pub struct PollStats {
42    /// Number of items delivered to the hook.
43    pub read: usize,
44    /// Number of items skipped because the consumer lagged or slots were overwritten.
45    pub dropped: usize,
46    /// Newest sequence observed while polling.
47    pub newest: u32,
48}
49
50/// Overwrite ring for SPSC high-rate telemetry.
51/// Producer never waits; consumer may drop if it lags > N.
52pub struct SeqRing<T: Copy, const N: usize> {
53    next_seq: AtomicU32,
54    published_seq: AtomicU32,
55    slot_seq: [AtomicU32; N],
56    slots: [UnsafeCell<MaybeUninit<T>>; N],
57    producer_taken: AtomicBool,
58    consumer_taken: AtomicBool,
59}
60
61// SAFETY: SeqRing is Sync because the producer/consumer handles enforce SPSC usage,
62// and all shared state is accessed via atomics. Values are written before their
63// sequence numbers are published with Release and read with Acquire. T: Send ensures
64// values can be transferred across threads safely.
65unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
66
67impl<T: Copy, const N: usize> SeqRing<T, N> {
68    /// Create a new ring buffer.
69    ///
70    /// # Panics
71    /// Panics if `N == 0`.
72    pub fn new() -> Self {
73        assert!(N > 0);
74        Self {
75            next_seq: AtomicU32::new(0),
76            published_seq: AtomicU32::new(0),
77            slot_seq: atomic_u32_array::<N>(0),
78            slots: unsafe_cell_array::<T, N>(),
79            producer_taken: AtomicBool::new(false),
80            consumer_taken: AtomicBool::new(false),
81        }
82    }
83
84    #[inline(always)]
85    const fn idx_for(seq: u32) -> usize {
86        ((seq.wrapping_sub(1)) as usize) % N
87    }
88
89    /// Create the producer handle. Only one producer may be active.
90    ///
91    /// # Panics
92    /// Panics if a producer handle is already active.
93    #[inline]
94    pub fn producer(&self) -> Producer<'_, T, N> {
95        assert!(
96            !self.producer_taken.swap(true, Ordering::AcqRel),
97            "SeqRing::producer() called while a producer is active"
98        );
99        Producer {
100            ring: self,
101            _not_sync: PhantomData,
102        }
103    }
104
105    /// Create the consumer handle. Only one consumer may be active.
106    ///
107    /// # Panics
108    /// Panics if a consumer handle is already active.
109    #[inline]
110    pub fn consumer(&self) -> Consumer<'_, T, N> {
111        assert!(
112            !self.consumer_taken.swap(true, Ordering::AcqRel),
113            "SeqRing::consumer() called while a consumer is active"
114        );
115        Consumer {
116            ring: self,
117            last_seq: 0,
118            dropped_accum: 0,
119            _not_sync: PhantomData,
120        }
121    }
122
123    #[inline]
124    fn newest_seq(&self) -> u32 {
125        self.published_seq.load(Ordering::Acquire)
126    }
127
128    #[inline]
129    fn push_inner(&self, value: T) -> u32 {
130        let mut seq = self
131            .next_seq
132            .fetch_add(1, Ordering::Relaxed)
133            .wrapping_add(1);
134        if seq == 0 {
135            seq = 1;
136            self.next_seq.store(1, Ordering::Relaxed);
137        }
138
139        let idx = Self::idx_for(seq);
140        unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
141
142        self.slot_seq[idx].store(seq, Ordering::Release);
143        self.published_seq.store(seq, Ordering::Release);
144        seq
145    }
146
147    #[inline]
148    fn read_seq_inner(&self, seq: u32) -> Option<T> {
149        let idx = Self::idx_for(seq);
150
151        let s1 = self.slot_seq[idx].load(Ordering::Acquire);
152        if s1 != seq {
153            return None;
154        }
155
156        let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
157
158        #[cfg(test)]
159        self.test_after_read_hook(idx);
160
161        let s2 = self.slot_seq[idx].load(Ordering::Acquire);
162        if s2 != seq {
163            return None;
164        }
165
166        Some(v)
167    }
168
169    #[cfg(test)]
170    fn test_after_read_hook(&self, idx: usize) {
171        let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
172        if target == self as *const _ as usize {
173            let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
174            self.slot_seq[idx].store(seq, Ordering::Release);
175            TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
176        }
177    }
178}
179
180/// Producer handle for writing into the ring.
181///
182/// This handle is `!Sync` to prevent concurrent producers.
183pub struct Producer<'a, T: Copy, const N: usize> {
184    ring: &'a SeqRing<T, N>,
185    _not_sync: PhantomData<Cell<()>>,
186}
187
188impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
189    /// Write a value into the ring.
190    ///
191    /// Returns the sequence number assigned to the write (never 0).
192    #[inline]
193    pub fn push(&self, value: T) -> u32 {
194        self.ring.push_inner(value)
195    }
196}
197
198impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
199    fn drop(&mut self) {
200        self.ring.producer_taken.store(false, Ordering::Release);
201    }
202}
203
204/// Consumer handle for reading from the ring.
205///
206/// This handle is `!Sync` to prevent concurrent consumers.
207pub struct Consumer<'a, T: Copy, const N: usize> {
208    ring: &'a SeqRing<T, N>,
209    last_seq: u32,
210    dropped_accum: usize,
211    _not_sync: PhantomData<Cell<()>>,
212}
213
214impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
215    /// How many items have been dropped since consumer creation (or since reset).
216    #[inline]
217    pub fn dropped(&self) -> usize {
218        self.dropped_accum
219    }
220
221    /// Reset the internal drop counter.
222    #[inline]
223    pub fn reset_dropped(&mut self) {
224        self.dropped_accum = 0;
225    }
226
227    /// Drain at most one item (in-order).
228    /// Returns true if an item was delivered to the hook.
229    #[inline]
230    pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
231        let mut hook = Some(hook);
232        let stats = self.poll_up_to(1, |seq, v| {
233            if let Some(hook) = hook.take() {
234                hook(seq, v);
235            }
236        });
237        stats.read == 1
238    }
239
240    /// Drain up to `max` items (in-order).
241    /// Hook sees `&T` but it is a reference to a **local copy** inside poll.
242    ///
243    /// If `max == 0`, this returns immediately with `read = 0`, `dropped = 0`, and
244    /// `newest` set to the latest published sequence.
245    pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
246        if max == 0 {
247            return PollStats {
248                read: 0,
249                dropped: 0,
250                newest: self.ring.newest_seq(),
251            };
252        }
253
254        let mut newest = self.ring.newest_seq();
255        if newest == 0 || newest == self.last_seq {
256            return PollStats {
257                read: 0,
258                dropped: 0,
259                newest,
260            };
261        }
262
263        let mut read = 0usize;
264        let mut dropped = 0usize;
265
266        while read < max {
267            newest = self.ring.newest_seq();
268            if self.last_seq == newest {
269                break;
270            }
271
272            let lag = newest.wrapping_sub(self.last_seq) as usize;
273            if lag > N {
274                let next = self.last_seq.wrapping_add(1);
275                let keep_from = newest.wrapping_sub((N - 1) as u32);
276                let jump_drops = keep_from.wrapping_sub(next) as usize;
277                dropped += jump_drops;
278                self.last_seq = keep_from.wrapping_sub(1);
279                continue;
280            }
281
282            let next = self.last_seq.wrapping_add(1);
283
284            match self.ring.read_seq_inner(next) {
285                Some(v) => {
286                    hook(next, &v);
287                    self.last_seq = next;
288                    read += 1;
289                }
290                None => {
291                    self.last_seq = next;
292                    dropped += 1;
293                }
294            }
295        }
296
297        self.dropped_accum += dropped;
298
299        PollStats {
300            read,
301            dropped,
302            newest,
303        }
304    }
305
306    /// "Give me the newest thing right now" (not in-order).
307    /// Returns true if it delivered something.
308    ///
309    /// This does not advance the consumer cursor.
310    #[inline]
311    pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
312        let newest = self.ring.newest_seq();
313        if newest == 0 {
314            return false;
315        }
316        if let Some(v) = self.ring.read_seq_inner(newest) {
317            hook(newest, &v);
318            true
319        } else {
320            false
321        }
322    }
323
324    /// Fast-forward consumer so the *next* `poll_one()` yields the newest item
325    /// (i.e. skip backlog).
326    ///
327    /// This does not modify the dropped counter.
328    #[inline]
329    pub fn skip_to_latest(&mut self) {
330        let newest = self.ring.newest_seq();
331        if newest != 0 {
332            self.last_seq = newest.wrapping_sub(1);
333        }
334    }
335}
336
337impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
338    fn drop(&mut self) {
339        self.ring.consumer_taken.store(false, Ordering::Release);
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
346    use core::sync::atomic::Ordering;
347    use std::vec::Vec;
348
349    #[test]
350    fn poll_one_empty_returns_false() {
351        let ring = SeqRing::<u32, 4>::new();
352        let mut consumer = ring.consumer();
353        let ok = consumer.poll_one(|_, _| {});
354        assert!(!ok);
355    }
356
357    #[test]
358    fn polls_in_order() {
359        let ring = SeqRing::<u32, 8>::new();
360        let producer = ring.producer();
361        let mut consumer = ring.consumer();
362
363        producer.push(10);
364        producer.push(11);
365        producer.push(12);
366
367        let mut seen = Vec::new();
368        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
369
370        assert_eq!(stats.read, 3);
371        assert_eq!(stats.dropped, 0);
372        assert_eq!(stats.newest, 3);
373        assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
374    }
375
376    #[test]
377    fn drops_when_consumer_lags() {
378        let ring = SeqRing::<u32, 4>::new();
379        let producer = ring.producer();
380        let mut consumer = ring.consumer();
381
382        for i in 0..10 {
383            producer.push(i);
384        }
385
386        let mut seen = Vec::new();
387        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
388
389        assert_eq!(stats.read, 4);
390        assert_eq!(stats.dropped, 6);
391        assert_eq!(stats.newest, 10);
392        assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
393    }
394
395    #[test]
396    fn latest_reads_newest() {
397        let ring = SeqRing::<u32, 8>::new();
398        let producer = ring.producer();
399        let consumer = ring.consumer();
400
401        producer.push(1);
402        producer.push(2);
403
404        let mut got = None;
405        let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
406
407        assert!(ok);
408        assert_eq!(got, Some((2, 2)));
409    }
410
411    #[test]
412    fn skip_to_latest_makes_next_poll_latest() {
413        let ring = SeqRing::<u32, 8>::new();
414        let producer = ring.producer();
415        let mut consumer = ring.consumer();
416
417        producer.push(10);
418        producer.push(11);
419        producer.push(12);
420
421        consumer.skip_to_latest();
422
423        let mut got = None;
424        let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
425
426        assert!(ok);
427        assert_eq!(got, Some((3, 12)));
428    }
429
430    #[test]
431    fn poll_up_to_zero_returns_newest_only() {
432        let ring = SeqRing::<u32, 4>::new();
433        let producer = ring.producer();
434        let mut consumer = ring.consumer();
435
436        producer.push(42);
437
438        let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
439
440        assert_eq!(stats.read, 0);
441        assert_eq!(stats.dropped, 0);
442        assert_eq!(stats.newest, 1);
443    }
444
445    #[test]
446    fn dropped_counter_can_reset() {
447        let ring = SeqRing::<u32, 2>::new();
448        let producer = ring.producer();
449        let mut consumer = ring.consumer();
450
451        for i in 0..5 {
452            producer.push(i);
453        }
454
455        let stats = consumer.poll_up_to(10, |_, _| {});
456
457        assert_eq!(consumer.dropped(), stats.dropped);
458
459        consumer.reset_dropped();
460
461        assert_eq!(consumer.dropped(), 0);
462    }
463
464    #[test]
465    fn latest_empty_returns_false() {
466        let ring = SeqRing::<u32, 4>::new();
467        let consumer = ring.consumer();
468
469        let ok = consumer.latest(|_, _| {});
470
471        assert!(!ok);
472    }
473
474    #[test]
475    fn latest_returns_false_when_slot_missing() {
476        let ring = SeqRing::<u32, 4>::new();
477        let consumer = ring.consumer();
478
479        ring.published_seq.store(1, Ordering::Release);
480
481        let ok = consumer.latest(|_, _| {});
482
483        assert!(!ok);
484    }
485
486    #[test]
487    fn poll_up_to_counts_dropped_when_slot_missing() {
488        let ring = SeqRing::<u32, 4>::new();
489        let mut consumer = ring.consumer();
490
491        ring.published_seq.store(1, Ordering::Release);
492
493        let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
494
495        assert_eq!(stats.read, 0);
496        assert_eq!(stats.dropped, 1);
497        assert_eq!(consumer.dropped(), 1);
498    }
499
500    #[test]
501    fn read_seq_inner_detects_overwrite_during_read() {
502        let ring = SeqRing::<u32, 4>::new();
503        let producer = ring.producer();
504        let seq = producer.push(7);
505
506        TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
507        TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
508
509        let got = ring.read_seq_inner(seq);
510
511        TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
512
513        assert!(got.is_none());
514    }
515
516    #[test]
517    fn push_wraps_seq_from_zero_to_one() {
518        let ring = SeqRing::<u32, 4>::new();
519
520        ring.next_seq.store(u32::MAX, Ordering::Relaxed);
521
522        let seq = ring.producer().push(1);
523
524        assert_eq!(seq, 1);
525        assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
526    }
527}