Skip to main content

ph_eventing/
seq_ring.rs

1//! Lock-free SPSC overwrite ring for high-rate telemetry in no-std contexts.
2//!
3//! # Overview
4//! - Single producer, single consumer.
5//! - Producer never blocks; new writes overwrite the oldest slots when the ring wraps.
6//! - Sequence numbers are monotonically increasing `u32`; `0` is reserved to mean "empty".
7//! - The consumer can drain in-order (`poll_one`/`poll_up_to`) or sample the newest value (`latest`).
8//! - If the consumer lags by more than `N`, it skips ahead and reports the number of dropped items.
9//!
10//! # Memory ordering
11//! The producer writes the value, publishes the per-slot sequence, then publishes the newest
12//! sequence. The consumer validates the per-slot sequence before and after reading, which avoids
13//! torn reads when the producer overwrites a slot.
14//!
15//! # Notes
16//! - `T` is `Copy` to allow returning values by copy without allocation.
17//! - The `&T` passed to hooks is a reference to a local copy made during the read.
18
19use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22#[cfg(test)]
23use core::sync::atomic::AtomicUsize;
24use core::sync::atomic::Ordering;
25#[cfg(target_has_atomic = "32")]
26use core::sync::atomic::{AtomicBool, AtomicU32};
27#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
28use portable_atomic::{AtomicBool, AtomicU32};
29
30fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
31    core::array::from_fn(|_| AtomicU32::new(init))
32}
33
34fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
35    core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
36}
37
38#[cfg(test)]
39static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
40#[cfg(test)]
41static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
42
43#[must_use]
44#[derive(Copy, Clone, Debug)]
45pub struct PollStats {
46    /// Number of items delivered to the hook.
47    pub read: usize,
48    /// Number of items skipped because the consumer lagged or slots were overwritten.
49    pub dropped: usize,
50    /// Newest sequence observed while polling.
51    pub newest: u32,
52}
53
54/// Overwrite ring for SPSC high-rate telemetry.
55/// Producer never waits; consumer may drop if it lags > N.
56pub struct SeqRing<T: Copy, const N: usize> {
57    next_seq: AtomicU32,
58    published_seq: AtomicU32,
59    slot_seq: [AtomicU32; N],
60    slots: [UnsafeCell<MaybeUninit<T>>; N],
61    producer_taken: AtomicBool,
62    consumer_taken: AtomicBool,
63}
64
65// SAFETY: SeqRing is Sync because the producer/consumer handles enforce SPSC usage,
66// and all shared state is accessed via atomics. Values are written before their
67// sequence numbers are published with Release and read with Acquire. T: Send ensures
68// values can be transferred across threads safely.
69unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
70
71impl<T: Copy, const N: usize> SeqRing<T, N> {
72    /// Create a new ring buffer.
73    ///
74    /// # Panics
75    /// Panics if `N == 0`.
76    pub fn new() -> Self {
77        assert!(N > 0);
78        Self {
79            next_seq: AtomicU32::new(0),
80            published_seq: AtomicU32::new(0),
81            slot_seq: atomic_u32_array::<N>(0),
82            slots: unsafe_cell_array::<T, N>(),
83            producer_taken: AtomicBool::new(false),
84            consumer_taken: AtomicBool::new(false),
85        }
86    }
87
88    /// Maximum number of items the ring can hold.
89    #[inline]
90    pub const fn capacity(&self) -> usize {
91        N
92    }
93
94    #[inline(always)]
95    const fn idx_for(seq: u32) -> usize {
96        ((seq.wrapping_sub(1)) as usize) % N
97    }
98
99    /// Create the producer handle. Only one producer may be active.
100    ///
101    /// # Panics
102    /// Panics if a producer handle is already active.
103    #[inline]
104    pub fn producer(&self) -> Producer<'_, T, N> {
105        assert!(
106            !self.producer_taken.swap(true, Ordering::AcqRel),
107            "SeqRing::producer() called while a producer is active"
108        );
109        Producer {
110            ring: self,
111            _not_sync: PhantomData,
112        }
113    }
114
115    /// Create the consumer handle. Only one consumer may be active.
116    ///
117    /// # Panics
118    /// Panics if a consumer handle is already active.
119    #[inline]
120    pub fn consumer(&self) -> Consumer<'_, T, N> {
121        assert!(
122            !self.consumer_taken.swap(true, Ordering::AcqRel),
123            "SeqRing::consumer() called while a consumer is active"
124        );
125        Consumer {
126            ring: self,
127            last_seq: 0,
128            dropped_accum: 0,
129            _not_sync: PhantomData,
130        }
131    }
132
133    #[inline]
134    fn newest_seq(&self) -> u32 {
135        self.published_seq.load(Ordering::Acquire)
136    }
137
138    #[inline]
139    fn push_inner(&self, value: T) -> u32 {
140        let mut seq = self
141            .next_seq
142            .fetch_add(1, Ordering::Relaxed)
143            .wrapping_add(1);
144        if seq == 0 {
145            seq = 1;
146            self.next_seq.store(1, Ordering::Relaxed);
147        }
148
149        let idx = Self::idx_for(seq);
150        unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
151
152        self.slot_seq[idx].store(seq, Ordering::Release);
153        self.published_seq.store(seq, Ordering::Release);
154        seq
155    }
156
157    #[inline]
158    fn read_seq_inner(&self, seq: u32) -> Option<T> {
159        let idx = Self::idx_for(seq);
160
161        let s1 = self.slot_seq[idx].load(Ordering::Acquire);
162        if s1 != seq {
163            return None;
164        }
165
166        let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
167
168        #[cfg(test)]
169        self.test_after_read_hook(idx);
170
171        let s2 = self.slot_seq[idx].load(Ordering::Acquire);
172        if s2 != seq {
173            return None;
174        }
175
176        Some(v)
177    }
178
179    #[cfg(test)]
180    fn test_after_read_hook(&self, idx: usize) {
181        let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
182        if target == self as *const _ as usize {
183            let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
184            self.slot_seq[idx].store(seq, Ordering::Release);
185            TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
186        }
187    }
188}
189
190impl<T: Copy, const N: usize> Default for SeqRing<T, N> {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl<T: Copy, const N: usize> core::fmt::Debug for SeqRing<T, N> {
197    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
198        f.debug_struct("SeqRing")
199            .field("capacity", &N)
200            .field("published_seq", &self.published_seq.load(Ordering::Relaxed))
201            .finish()
202    }
203}
204
205/// Producer handle for writing into the ring.
206///
207/// This handle is `!Sync` to prevent concurrent producers.
208pub struct Producer<'a, T: Copy, const N: usize> {
209    ring: &'a SeqRing<T, N>,
210    _not_sync: PhantomData<Cell<()>>,
211}
212
213impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
214    /// Write a value into the ring.
215    ///
216    /// Returns the sequence number assigned to the write (never 0).
217    #[inline]
218    pub fn push(&self, value: T) -> u32 {
219        self.ring.push_inner(value)
220    }
221}
222
223impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
224    fn drop(&mut self) {
225        self.ring.producer_taken.store(false, Ordering::Release);
226    }
227}
228
229impl<T: Copy, const N: usize> core::fmt::Debug for Producer<'_, T, N> {
230    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
231        f.debug_struct("seq_ring::Producer")
232            .field("capacity", &N)
233            .finish()
234    }
235}
236
237/// Consumer handle for reading from the ring.
238///
239/// This handle is `!Sync` to prevent concurrent consumers.
240pub struct Consumer<'a, T: Copy, const N: usize> {
241    ring: &'a SeqRing<T, N>,
242    last_seq: u32,
243    dropped_accum: usize,
244    _not_sync: PhantomData<Cell<()>>,
245}
246
247impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
248    /// How many items have been dropped since consumer creation (or since reset).
249    #[inline]
250    pub fn dropped(&self) -> usize {
251        self.dropped_accum
252    }
253
254    /// Reset the internal drop counter.
255    #[inline]
256    pub fn reset_dropped(&mut self) {
257        self.dropped_accum = 0;
258    }
259
260    /// Drain at most one item (in-order).
261    /// Returns true if an item was delivered to the hook.
262    #[inline]
263    pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
264        let mut hook = Some(hook);
265        let stats = self.poll_up_to(1, |seq, v| {
266            if let Some(hook) = hook.take() {
267                hook(seq, v);
268            }
269        });
270        stats.read == 1
271    }
272
273    /// Drain up to `max` items (in-order).
274    /// Hook sees `&T` but it is a reference to a **local copy** inside poll.
275    ///
276    /// If `max == 0`, this returns immediately with `read = 0`, `dropped = 0`, and
277    /// `newest` set to the latest published sequence.
278    pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
279        if max == 0 {
280            return PollStats {
281                read: 0,
282                dropped: 0,
283                newest: self.ring.newest_seq(),
284            };
285        }
286
287        let mut newest = self.ring.newest_seq();
288        if newest == 0 || newest == self.last_seq {
289            return PollStats {
290                read: 0,
291                dropped: 0,
292                newest,
293            };
294        }
295
296        let mut read = 0usize;
297        let mut dropped = 0usize;
298
299        while read < max {
300            newest = self.ring.newest_seq();
301            if self.last_seq == newest {
302                break;
303            }
304
305            let lag = newest.wrapping_sub(self.last_seq) as usize;
306            if lag > N {
307                let next = self.last_seq.wrapping_add(1);
308                let keep_from = newest.wrapping_sub((N - 1) as u32);
309                let jump_drops = keep_from.wrapping_sub(next) as usize;
310                dropped += jump_drops;
311                self.last_seq = keep_from.wrapping_sub(1);
312                continue;
313            }
314
315            let next = self.last_seq.wrapping_add(1);
316
317            match self.ring.read_seq_inner(next) {
318                Some(v) => {
319                    hook(next, &v);
320                    self.last_seq = next;
321                    read += 1;
322                }
323                None => {
324                    self.last_seq = next;
325                    dropped += 1;
326                }
327            }
328        }
329
330        self.dropped_accum += dropped;
331
332        PollStats {
333            read,
334            dropped,
335            newest,
336        }
337    }
338
339    /// "Give me the newest thing right now" (not in-order).
340    /// Returns true if it delivered something.
341    ///
342    /// This does not advance the consumer cursor.
343    #[inline]
344    pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
345        let newest = self.ring.newest_seq();
346        if newest == 0 {
347            return false;
348        }
349        if let Some(v) = self.ring.read_seq_inner(newest) {
350            hook(newest, &v);
351            true
352        } else {
353            false
354        }
355    }
356
357    /// Fast-forward consumer so the *next* `poll_one()` yields the newest item
358    /// (i.e. skip backlog).
359    ///
360    /// This does not modify the dropped counter.
361    #[inline]
362    pub fn skip_to_latest(&mut self) {
363        let newest = self.ring.newest_seq();
364        if newest != 0 {
365            self.last_seq = newest.wrapping_sub(1);
366        }
367    }
368}
369
370impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
371    fn drop(&mut self) {
372        self.ring.consumer_taken.store(false, Ordering::Release);
373    }
374}
375
376impl<T: Copy, const N: usize> core::fmt::Debug for Consumer<'_, T, N> {
377    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
378        f.debug_struct("seq_ring::Consumer")
379            .field("capacity", &N)
380            .field("last_seq", &self.last_seq)
381            .field("dropped", &self.dropped_accum)
382            .finish()
383    }
384}
385
386impl<T: Copy, const N: usize> crate::traits::Sink<T> for Producer<'_, T, N> {
387    type Error = core::convert::Infallible;
388
389    #[inline]
390    fn try_push(&mut self, val: T) -> Result<(), core::convert::Infallible> {
391        self.push(val);
392        Ok(())
393    }
394}
395
396impl<T: Copy, const N: usize> crate::traits::Source<T> for Consumer<'_, T, N> {
397    #[inline]
398    fn try_pop(&mut self) -> Option<T> {
399        let mut result = None;
400        self.poll_one(|_seq, v| result = Some(*v));
401        result
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
408    use core::sync::atomic::Ordering;
409    use std::vec::Vec;
410
411    #[test]
412    fn poll_one_empty_returns_false() {
413        let ring = SeqRing::<u32, 4>::new();
414        let mut consumer = ring.consumer();
415        let ok = consumer.poll_one(|_, _| {});
416        assert!(!ok);
417    }
418
419    #[test]
420    fn polls_in_order() {
421        let ring = SeqRing::<u32, 8>::new();
422        let producer = ring.producer();
423        let mut consumer = ring.consumer();
424
425        producer.push(10);
426        producer.push(11);
427        producer.push(12);
428
429        let mut seen = Vec::new();
430        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
431
432        assert_eq!(stats.read, 3);
433        assert_eq!(stats.dropped, 0);
434        assert_eq!(stats.newest, 3);
435        assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
436    }
437
438    #[test]
439    fn drops_when_consumer_lags() {
440        let ring = SeqRing::<u32, 4>::new();
441        let producer = ring.producer();
442        let mut consumer = ring.consumer();
443
444        for i in 0..10 {
445            producer.push(i);
446        }
447
448        let mut seen = Vec::new();
449        let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
450
451        assert_eq!(stats.read, 4);
452        assert_eq!(stats.dropped, 6);
453        assert_eq!(stats.newest, 10);
454        assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
455    }
456
457    #[test]
458    fn latest_reads_newest() {
459        let ring = SeqRing::<u32, 8>::new();
460        let producer = ring.producer();
461        let consumer = ring.consumer();
462
463        producer.push(1);
464        producer.push(2);
465
466        let mut got = None;
467        let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
468
469        assert!(ok);
470        assert_eq!(got, Some((2, 2)));
471    }
472
473    #[test]
474    fn skip_to_latest_makes_next_poll_latest() {
475        let ring = SeqRing::<u32, 8>::new();
476        let producer = ring.producer();
477        let mut consumer = ring.consumer();
478
479        producer.push(10);
480        producer.push(11);
481        producer.push(12);
482
483        consumer.skip_to_latest();
484
485        let mut got = None;
486        let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
487
488        assert!(ok);
489        assert_eq!(got, Some((3, 12)));
490    }
491
492    #[test]
493    fn poll_up_to_zero_returns_newest_only() {
494        let ring = SeqRing::<u32, 4>::new();
495        let producer = ring.producer();
496        let mut consumer = ring.consumer();
497
498        producer.push(42);
499
500        let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
501
502        assert_eq!(stats.read, 0);
503        assert_eq!(stats.dropped, 0);
504        assert_eq!(stats.newest, 1);
505    }
506
507    #[test]
508    fn dropped_counter_can_reset() {
509        let ring = SeqRing::<u32, 2>::new();
510        let producer = ring.producer();
511        let mut consumer = ring.consumer();
512
513        for i in 0..5 {
514            producer.push(i);
515        }
516
517        let stats = consumer.poll_up_to(10, |_, _| {});
518
519        assert_eq!(consumer.dropped(), stats.dropped);
520
521        consumer.reset_dropped();
522
523        assert_eq!(consumer.dropped(), 0);
524    }
525
526    #[test]
527    fn latest_empty_returns_false() {
528        let ring = SeqRing::<u32, 4>::new();
529        let consumer = ring.consumer();
530
531        let ok = consumer.latest(|_, _| {});
532
533        assert!(!ok);
534    }
535
536    #[test]
537    fn latest_returns_false_when_slot_missing() {
538        let ring = SeqRing::<u32, 4>::new();
539        let consumer = ring.consumer();
540
541        ring.published_seq.store(1, Ordering::Release);
542
543        let ok = consumer.latest(|_, _| {});
544
545        assert!(!ok);
546    }
547
548    #[test]
549    fn poll_up_to_counts_dropped_when_slot_missing() {
550        let ring = SeqRing::<u32, 4>::new();
551        let mut consumer = ring.consumer();
552
553        ring.published_seq.store(1, Ordering::Release);
554
555        let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
556
557        assert_eq!(stats.read, 0);
558        assert_eq!(stats.dropped, 1);
559        assert_eq!(consumer.dropped(), 1);
560    }
561
562    #[test]
563    fn read_seq_inner_detects_overwrite_during_read() {
564        let ring = SeqRing::<u32, 4>::new();
565        let producer = ring.producer();
566        let seq = producer.push(7);
567
568        TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
569        TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
570
571        let got = ring.read_seq_inner(seq);
572
573        TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
574
575        assert!(got.is_none());
576    }
577
578    #[test]
579    fn push_wraps_seq_from_zero_to_one() {
580        let ring = SeqRing::<u32, 4>::new();
581
582        ring.next_seq.store(u32::MAX, Ordering::Relaxed);
583
584        let seq = ring.producer().push(1);
585
586        assert_eq!(seq, 1);
587        assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
588    }
589
590    #[test]
591    fn capacity_returns_n() {
592        let ring = SeqRing::<u32, 8>::new();
593        assert_eq!(ring.capacity(), 8);
594    }
595}