Skip to main content

ph_eventing/
seq_ring.rs

1//! Lock-free SPSC overwrite ring for high-rate telemetry in no-std contexts.
2//!
3//! # Overview
4//! - Single producer, single consumer.
5//! - Producer never blocks; new writes overwrite the oldest slots when the ring wraps.
6//! - Sequence numbers are monotonically increasing `u32`; `0` is reserved to mean "empty".
7//! - The consumer can drain in-order (`poll_one`/`poll_up_to`) or sample the newest value (`latest`).
8//! - If the consumer lags by more than `N`, it skips ahead and reports the number of dropped items.
9//!
10//! # Memory ordering
11//! The producer writes the value, publishes the per-slot sequence, then publishes the newest
12//! sequence. The consumer validates the per-slot sequence before and after reading, which avoids
13//! torn reads when the producer overwrites a slot.
14//!
15//! # Notes
16//! - `T` is `Copy` to allow returning values by copy without allocation.
17//! - The `&T` passed to hooks is a reference to a local copy made during the read.
18
19use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22#[cfg(test)]
23use core::sync::atomic::AtomicUsize;
24use core::sync::atomic::Ordering;
25#[cfg(target_has_atomic = "32")]
26use core::sync::atomic::{AtomicBool, AtomicU32};
27#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
28use portable_atomic::{AtomicBool, AtomicU32};
29
30fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
31    core::array::from_fn(|_| AtomicU32::new(init))
32}
33
34fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
35    core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
36}
37
38#[cfg(test)]
39static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
40#[cfg(test)]
41static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
42
43#[must_use]
44#[derive(Copy, Clone, Debug)]
45pub struct PollStats {
46    /// Number of items delivered to the hook.
47    pub read: usize,
48    /// Number of items skipped because the consumer lagged or slots were overwritten.
49    pub dropped: usize,
50    /// Newest sequence observed while polling.
51    pub newest: u32,
52}
53
54/// Overwrite ring for SPSC high-rate telemetry.
55/// Producer never waits; consumer may drop if it lags > N.
56pub struct SeqRing<T: Copy, const N: usize> {
57    next_seq: AtomicU32,
58    published_seq: AtomicU32,
59    slot_seq: [AtomicU32; N],
60    slots: [UnsafeCell<MaybeUninit<T>>; N],
61    producer_taken: AtomicBool,
62    consumer_taken: AtomicBool,
63}
64
65// SAFETY: SeqRing is Sync because the producer/consumer handles enforce SPSC usage,
66// and all shared state is accessed via atomics. Values are written before their
67// sequence numbers are published with Release and read with Acquire. T: Send ensures
68// values can be transferred across threads safely.
69unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
70
71impl<T: Copy, const N: usize> SeqRing<T, N> {
72    /// Create a new ring buffer.
73    ///
74    /// # Panics
75    /// Panics if `N == 0`.
76    pub fn new() -> Self {
77        assert!(N > 0);
78        Self {
79            next_seq: AtomicU32::new(0),
80            published_seq: AtomicU32::new(0),
81            slot_seq: atomic_u32_array::<N>(0),
82            slots: unsafe_cell_array::<T, N>(),
83            producer_taken: AtomicBool::new(false),
84            consumer_taken: AtomicBool::new(false),
85        }
86    }
87
88    #[inline(always)]
89    const fn idx_for(seq: u32) -> usize {
90        ((seq.wrapping_sub(1)) as usize) % N
91    }
92
93    /// Create the producer handle. Only one producer may be active.
94    ///
95    /// # Panics
96    /// Panics if a producer handle is already active.
97    #[inline]
98    pub fn producer(&self) -> Producer<'_, T, N> {
99        assert!(
100            !self.producer_taken.swap(true, Ordering::AcqRel),
101            "SeqRing::producer() called while a producer is active"
102        );
103        Producer {
104            ring: self,
105            _not_sync: PhantomData,
106        }
107    }
108
109    /// Create the consumer handle. Only one consumer may be active.
110    ///
111    /// # Panics
112    /// Panics if a consumer handle is already active.
113    #[inline]
114    pub fn consumer(&self) -> Consumer<'_, T, N> {
115        assert!(
116            !self.consumer_taken.swap(true, Ordering::AcqRel),
117            "SeqRing::consumer() called while a consumer is active"
118        );
119        Consumer {
120            ring: self,
121            last_seq: 0,
122            dropped_accum: 0,
123            _not_sync: PhantomData,
124        }
125    }
126
127    #[inline]
128    fn newest_seq(&self) -> u32 {
129        self.published_seq.load(Ordering::Acquire)
130    }
131
132    #[inline]
133    fn push_inner(&self, value: T) -> u32 {
134        let mut seq = self
135            .next_seq
136            .fetch_add(1, Ordering::Relaxed)
137            .wrapping_add(1);
138        if seq == 0 {
139            seq = 1;
140            self.next_seq.store(1, Ordering::Relaxed);
141        }
142
143        let idx = Self::idx_for(seq);
144        unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
145
146        self.slot_seq[idx].store(seq, Ordering::Release);
147        self.published_seq.store(seq, Ordering::Release);
148        seq
149    }
150
151    #[inline]
152    fn read_seq_inner(&self, seq: u32) -> Option<T> {
153        let idx = Self::idx_for(seq);
154
155        let s1 = self.slot_seq[idx].load(Ordering::Acquire);
156        if s1 != seq {
157            return None;
158        }
159
160        let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
161
162        #[cfg(test)]
163        self.test_after_read_hook(idx);
164
165        let s2 = self.slot_seq[idx].load(Ordering::Acquire);
166        if s2 != seq {
167            return None;
168        }
169
170        Some(v)
171    }
172
173    #[cfg(test)]
174    fn test_after_read_hook(&self, idx: usize) {
175        let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
176        if target == self as *const _ as usize {
177            let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
178            self.slot_seq[idx].store(seq, Ordering::Release);
179            TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
180        }
181    }
182}
183
184impl<T: Copy, const N: usize> Default for SeqRing<T, N> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190/// Producer handle for writing into the ring.
191///
192/// This handle is `!Sync` to prevent concurrent producers.
193pub struct Producer<'a, T: Copy, const N: usize> {
194    ring: &'a SeqRing<T, N>,
195    _not_sync: PhantomData<Cell<()>>,
196}
197
198impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
199    /// Write a value into the ring.
200    ///
201    /// Returns the sequence number assigned to the write (never 0).
202    #[inline]
203    pub fn push(&self, value: T) -> u32 {
204        self.ring.push_inner(value)
205    }
206}
207
208impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
209    fn drop(&mut self) {
210        self.ring.producer_taken.store(false, Ordering::Release);
211    }
212}
213
214/// Consumer handle for reading from the ring.
215///
216/// This handle is `!Sync` to prevent concurrent consumers.
217pub struct Consumer<'a, T: Copy, const N: usize> {
218    ring: &'a SeqRing<T, N>,
219    last_seq: u32,
220    dropped_accum: usize,
221    _not_sync: PhantomData<Cell<()>>,
222}
223
224impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
225    /// How many items have been dropped since consumer creation (or since reset).
226    #[inline]
227    pub fn dropped(&self) -> usize {
228        self.dropped_accum
229    }
230
231    /// Reset the internal drop counter.
232    #[inline]
233    pub fn reset_dropped(&mut self) {
234        self.dropped_accum = 0;
235    }
236
237    /// Drain at most one item (in-order).
238    /// Returns true if an item was delivered to the hook.
239    #[inline]
240    pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
241        let mut hook = Some(hook);
242        let stats = self.poll_up_to(1, |seq, v| {
243            if let Some(hook) = hook.take() {
244                hook(seq, v);
245            }
246        });
247        stats.read == 1
248    }
249
250    /// Drain up to `max` items (in-order).
251    /// Hook sees `&T` but it is a reference to a **local copy** inside poll.
252    ///
253    /// If `max == 0`, this returns immediately with `read = 0`, `dropped = 0`, and
254    /// `newest` set to the latest published sequence.
255    pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
256        if max == 0 {
257            return PollStats {
258                read: 0,
259                dropped: 0,
260                newest: self.ring.newest_seq(),
261            };
262        }
263
264        let mut newest = self.ring.newest_seq();
265        if newest == 0 || newest == self.last_seq {
266            return PollStats {
267                read: 0,
268                dropped: 0,
269                newest,
270            };
271        }
272
273        let mut read = 0usize;
274        let mut dropped = 0usize;
275
276        while read < max {
277            newest = self.ring.newest_seq();
278            if self.last_seq == newest {
279                break;
280            }
281
282            let lag = newest.wrapping_sub(self.last_seq) as usize;
283            if lag > N {
284                let next = self.last_seq.wrapping_add(1);
285                let keep_from = newest.wrapping_sub((N - 1) as u32);
286                let jump_drops = keep_from.wrapping_sub(next) as usize;
287                dropped += jump_drops;
288                self.last_seq = keep_from.wrapping_sub(1);
289                continue;
290            }
291
292            let next = self.last_seq.wrapping_add(1);
293
294            match self.ring.read_seq_inner(next) {
295                Some(v) => {
296                    hook(next, &v);
297                    self.last_seq = next;
298                    read += 1;
299                }
300                None => {
301                    self.last_seq = next;
302                    dropped += 1;
303                }
304            }
305        }
306
307        self.dropped_accum += dropped;
308
309        PollStats {
310            read,
311            dropped,
312            newest,
313        }
314    }
315
316    /// "Give me the newest thing right now" (not in-order).
317    /// Returns true if it delivered something.
318    ///
319    /// This does not advance the consumer cursor.
320    #[inline]
321    pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
322        let newest = self.ring.newest_seq();
323        if newest == 0 {
324            return false;
325        }
326        if let Some(v) = self.ring.read_seq_inner(newest) {
327            hook(newest, &v);
328            true
329        } else {
330            false
331        }
332    }
333
334    /// Fast-forward consumer so the *next* `poll_one()` yields the newest item
335    /// (i.e. skip backlog).
336    ///
337    /// This does not modify the dropped counter.
338    #[inline]
339    pub fn skip_to_latest(&mut self) {
340        let newest = self.ring.newest_seq();
341        if newest != 0 {
342            self.last_seq = newest.wrapping_sub(1);
343        }
344    }
345}
346
347impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
348    fn drop(&mut self) {
349        self.ring.consumer_taken.store(false, Ordering::Release);
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
356    use core::sync::atomic::Ordering;
357    use std::vec::Vec;
358
359    #[test]
360    fn poll_one_empty_returns_false() {
361        let ring = SeqRing::<u32, 4>::new();
362        let mut consumer = ring.consumer();
363        let ok = consumer.poll_one(|_, _| {});
364        assert!(!ok);
365    }
366
367    #[test]
368    fn polls_in_order() {
369        let ring = SeqRing::<u32, 8>::new();
370        let producer = ring.producer();
371        let mut consumer = ring.consumer();
372
373        producer.push(10);
374        producer.push(11);
375        producer.push(12);
376
377        let mut seen = Vec::new();
378        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
379
380        assert_eq!(stats.read, 3);
381        assert_eq!(stats.dropped, 0);
382        assert_eq!(stats.newest, 3);
383        assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
384    }
385
386    #[test]
387    fn drops_when_consumer_lags() {
388        let ring = SeqRing::<u32, 4>::new();
389        let producer = ring.producer();
390        let mut consumer = ring.consumer();
391
392        for i in 0..10 {
393            producer.push(i);
394        }
395
396        let mut seen = Vec::new();
397        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
398
399        assert_eq!(stats.read, 4);
400        assert_eq!(stats.dropped, 6);
401        assert_eq!(stats.newest, 10);
402        assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
403    }
404
405    #[test]
406    fn latest_reads_newest() {
407        let ring = SeqRing::<u32, 8>::new();
408        let producer = ring.producer();
409        let consumer = ring.consumer();
410
411        producer.push(1);
412        producer.push(2);
413
414        let mut got = None;
415        let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
416
417        assert!(ok);
418        assert_eq!(got, Some((2, 2)));
419    }
420
421    #[test]
422    fn skip_to_latest_makes_next_poll_latest() {
423        let ring = SeqRing::<u32, 8>::new();
424        let producer = ring.producer();
425        let mut consumer = ring.consumer();
426
427        producer.push(10);
428        producer.push(11);
429        producer.push(12);
430
431        consumer.skip_to_latest();
432
433        let mut got = None;
434        let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
435
436        assert!(ok);
437        assert_eq!(got, Some((3, 12)));
438    }
439
440    #[test]
441    fn poll_up_to_zero_returns_newest_only() {
442        let ring = SeqRing::<u32, 4>::new();
443        let producer = ring.producer();
444        let mut consumer = ring.consumer();
445
446        producer.push(42);
447
448        let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
449
450        assert_eq!(stats.read, 0);
451        assert_eq!(stats.dropped, 0);
452        assert_eq!(stats.newest, 1);
453    }
454
455    #[test]
456    fn dropped_counter_can_reset() {
457        let ring = SeqRing::<u32, 2>::new();
458        let producer = ring.producer();
459        let mut consumer = ring.consumer();
460
461        for i in 0..5 {
462            producer.push(i);
463        }
464
465        let stats = consumer.poll_up_to(10, |_, _| {});
466
467        assert_eq!(consumer.dropped(), stats.dropped);
468
469        consumer.reset_dropped();
470
471        assert_eq!(consumer.dropped(), 0);
472    }
473
474    #[test]
475    fn latest_empty_returns_false() {
476        let ring = SeqRing::<u32, 4>::new();
477        let consumer = ring.consumer();
478
479        let ok = consumer.latest(|_, _| {});
480
481        assert!(!ok);
482    }
483
484    #[test]
485    fn latest_returns_false_when_slot_missing() {
486        let ring = SeqRing::<u32, 4>::new();
487        let consumer = ring.consumer();
488
489        ring.published_seq.store(1, Ordering::Release);
490
491        let ok = consumer.latest(|_, _| {});
492
493        assert!(!ok);
494    }
495
496    #[test]
497    fn poll_up_to_counts_dropped_when_slot_missing() {
498        let ring = SeqRing::<u32, 4>::new();
499        let mut consumer = ring.consumer();
500
501        ring.published_seq.store(1, Ordering::Release);
502
503        let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
504
505        assert_eq!(stats.read, 0);
506        assert_eq!(stats.dropped, 1);
507        assert_eq!(consumer.dropped(), 1);
508    }
509
510    #[test]
511    fn read_seq_inner_detects_overwrite_during_read() {
512        let ring = SeqRing::<u32, 4>::new();
513        let producer = ring.producer();
514        let seq = producer.push(7);
515
516        TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
517        TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
518
519        let got = ring.read_seq_inner(seq);
520
521        TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
522
523        assert!(got.is_none());
524    }
525
526    #[test]
527    fn push_wraps_seq_from_zero_to_one() {
528        let ring = SeqRing::<u32, 4>::new();
529
530        ring.next_seq.store(u32::MAX, Ordering::Relaxed);
531
532        let seq = ring.producer().push(1);
533
534        assert_eq!(seq, 1);
535        assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
536    }
537}