Skip to main content

ph_eventing/
event_buf.rs

1//! Bounded SPSC event buffer with backpressure — no heap, no alloc.
2//!
3//! [`EventBuf`] is a fixed-size, lock-free, single-producer single-consumer
4//! ring buffer that **rejects** pushes when full instead of overwriting.
5//! This gives the producer explicit backpressure so no events are silently
6//! lost.
7//!
8//! # When to use
9//! Use `EventBuf` when every event matters and the producer can afford to
10//! handle a "buffer full" signal (retry, log, or apply its own policy).
11//! If losing old events is acceptable, prefer [`crate::SeqRing`].
12//! If you only need a single-owner ring, see [`crate::RingBuf`].
13//!
14//! # Memory ordering
15//! This is a classic Lamport SPSC queue:
16//! - The producer owns `head` (Relaxed load, Release store) and reads
17//!   `tail` with Acquire to see consumer progress.
18//! - The consumer owns `tail` (Relaxed load, Release store) and reads
19//!   `head` with Acquire to see producer progress.
20//! - A slot is written before `head` is advanced and read before `tail` is
21//!   advanced, so the Release/Acquire pairs on the cursors act as the
22//!   publication fence.
23//!
24//! # Example
25//! ```
26//! use ph_eventing::EventBuf;
27//!
28//! let buf = EventBuf::<u32, 4>::new();
29//! let producer = buf.producer();
30//! let consumer = buf.consumer();
31//!
32//! assert!(producer.push(1).is_ok());
33//! assert!(producer.push(2).is_ok());
34//! assert_eq!(consumer.pop(), Some(1));
35//! assert_eq!(consumer.pop(), Some(2));
36//! assert_eq!(consumer.pop(), None); // empty
37//! ```
38
39use core::cell::{Cell, UnsafeCell};
40use core::marker::PhantomData;
41use core::mem::MaybeUninit;
42use core::sync::atomic::Ordering;
43#[cfg(target_has_atomic = "32")]
44use core::sync::atomic::{AtomicBool, AtomicU32};
45#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
46use portable_atomic::{AtomicBool, AtomicU32};
47
48fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
49    core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
50}
51
52/// Bounded SPSC event buffer with backpressure.
53///
54/// When the buffer is full, [`Producer::push`] returns `Err(val)` instead
55/// of overwriting, giving the producer a chance to retry, drop, or log.
56/// The consumer drains items with [`Consumer::pop`] or [`Consumer::drain`].
57///
58/// # Panics
59/// - `EventBuf::new()` panics if `N == 0`.
60/// - `producer()` / `consumer()` panic if called while another handle of
61///   the same kind is already active.
62pub struct EventBuf<T: Copy, const N: usize> {
63    head: AtomicU32,
64    tail: AtomicU32,
65    slots: [UnsafeCell<MaybeUninit<T>>; N],
66    producer_taken: AtomicBool,
67    consumer_taken: AtomicBool,
68}
69
70// SAFETY: EventBuf is Sync because the producer/consumer handles enforce
71// SPSC usage, and the head/tail cursors are accessed via atomics with
72// Release/Acquire ordering that guarantees slot visibility. T: Send ensures
73// values can be transferred across threads safely.
74unsafe impl<T: Copy + Send, const N: usize> Sync for EventBuf<T, N> {}
75
76impl<T: Copy, const N: usize> EventBuf<T, N> {
77    /// Create a new, empty event buffer.
78    ///
79    /// # Panics
80    /// Panics if `N == 0`.
81    pub fn new() -> Self {
82        assert!(N > 0, "EventBuf capacity N must be > 0");
83        Self {
84            head: AtomicU32::new(0),
85            tail: AtomicU32::new(0),
86            slots: unsafe_cell_array::<T, N>(),
87            producer_taken: AtomicBool::new(false),
88            consumer_taken: AtomicBool::new(false),
89        }
90    }
91
92    #[inline(always)]
93    const fn slot_index(pos: u32) -> usize {
94        (pos as usize) % N
95    }
96
97    /// Maximum number of items the buffer can hold.
98    #[inline]
99    pub const fn capacity(&self) -> usize {
100        N
101    }
102
103    /// Approximate number of items currently buffered.
104    ///
105    /// This is a snapshot — by the time the caller acts on it the value may
106    /// already be stale.
107    #[inline]
108    pub fn len(&self) -> usize {
109        let h = self.head.load(Ordering::Relaxed);
110        let t = self.tail.load(Ordering::Relaxed);
111        h.wrapping_sub(t) as usize
112    }
113
114    /// Returns `true` if the buffer contains no items (approximate).
115    #[inline]
116    pub fn is_empty(&self) -> bool {
117        self.len() == 0
118    }
119
120    /// Returns `true` if the buffer is at capacity (approximate).
121    #[inline]
122    pub fn is_full(&self) -> bool {
123        self.len() >= N
124    }
125
126    /// Create the producer handle. Only one producer may be active.
127    ///
128    /// # Panics
129    /// Panics if a producer handle is already active.
130    #[inline]
131    pub fn producer(&self) -> Producer<'_, T, N> {
132        assert!(
133            !self.producer_taken.swap(true, Ordering::AcqRel),
134            "EventBuf: only one Producer may be active at a time"
135        );
136        Producer {
137            buf: self,
138            _not_sync: PhantomData,
139        }
140    }
141
142    /// Create the consumer handle. Only one consumer may be active.
143    ///
144    /// # Panics
145    /// Panics if a consumer handle is already active.
146    #[inline]
147    pub fn consumer(&self) -> Consumer<'_, T, N> {
148        assert!(
149            !self.consumer_taken.swap(true, Ordering::AcqRel),
150            "EventBuf: only one Consumer may be active at a time"
151        );
152        Consumer {
153            buf: self,
154            _not_sync: PhantomData,
155        }
156    }
157}
158
159impl<T: Copy, const N: usize> Default for EventBuf<T, N> {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl<T: Copy, const N: usize> core::fmt::Debug for EventBuf<T, N> {
166    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
167        f.debug_struct("EventBuf")
168            .field("len", &self.len())
169            .field("capacity", &N)
170            .finish()
171    }
172}
173
174/// Write handle for an [`EventBuf`].
175///
176/// Dropping the producer releases the slot so a new one can be created.
177pub struct Producer<'a, T: Copy, const N: usize> {
178    buf: &'a EventBuf<T, N>,
179    _not_sync: PhantomData<Cell<()>>,
180}
181
182impl<T: Copy, const N: usize> Producer<'_, T, N> {
183    /// Try to push a value into the buffer.
184    ///
185    /// Returns `Ok(())` on success, or `Err(val)` if the buffer is full
186    /// (the value is returned to the caller so nothing is lost).
187    #[inline]
188    pub fn push(&self, val: T) -> Result<(), T> {
189        let head = self.buf.head.load(Ordering::Relaxed);
190        let tail = self.buf.tail.load(Ordering::Acquire);
191        if head.wrapping_sub(tail) as usize >= N {
192            return Err(val);
193        }
194        let idx = EventBuf::<T, N>::slot_index(head);
195        // SAFETY: producer is the only writer to this slot; the consumer
196        // will not read it until head is advanced (Release below).
197        unsafe {
198            (*self.buf.slots[idx].get()).write(val);
199        }
200        self.buf.head.store(head.wrapping_add(1), Ordering::Release);
201        Ok(())
202    }
203}
204
205impl<T: Copy, const N: usize> Drop for Producer<'_, T, N> {
206    fn drop(&mut self) {
207        self.buf.producer_taken.store(false, Ordering::Release);
208    }
209}
210
211impl<T: Copy, const N: usize> core::fmt::Debug for Producer<'_, T, N> {
212    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
213        f.debug_struct("event_buf::Producer")
214            .field("capacity", &N)
215            .finish()
216    }
217}
218
219/// Read handle for an [`EventBuf`].
220///
221/// Dropping the consumer releases the slot so a new one can be created.
222pub struct Consumer<'a, T: Copy, const N: usize> {
223    buf: &'a EventBuf<T, N>,
224    _not_sync: PhantomData<Cell<()>>,
225}
226
227impl<T: Copy, const N: usize> Consumer<'_, T, N> {
228    /// Pop the oldest item from the buffer.
229    ///
230    /// Returns `None` if the buffer is empty.
231    #[inline]
232    pub fn pop(&self) -> Option<T> {
233        let tail = self.buf.tail.load(Ordering::Relaxed);
234        let head = self.buf.head.load(Ordering::Acquire);
235        if tail == head {
236            return None;
237        }
238        let idx = EventBuf::<T, N>::slot_index(tail);
239        // SAFETY: consumer is the only reader of this slot; the producer
240        // will not overwrite it until tail is advanced (Release below).
241        let val = unsafe { (*self.buf.slots[idx].get()).assume_init_read() };
242        self.buf.tail.store(tail.wrapping_add(1), Ordering::Release);
243        Some(val)
244    }
245
246    /// Drain up to `max` items, passing each to `hook`.
247    ///
248    /// Returns the number of items consumed.
249    #[inline]
250    pub fn drain(&self, max: usize, mut hook: impl FnMut(T)) -> usize {
251        let mut count = 0;
252        while count < max {
253            match self.pop() {
254                Some(val) => {
255                    hook(val);
256                    count += 1;
257                }
258                None => break,
259            }
260        }
261        count
262    }
263}
264
265impl<T: Copy, const N: usize> Drop for Consumer<'_, T, N> {
266    fn drop(&mut self) {
267        self.buf.consumer_taken.store(false, Ordering::Release);
268    }
269}
270
271impl<T: Copy, const N: usize> core::fmt::Debug for Consumer<'_, T, N> {
272    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
273        f.debug_struct("event_buf::Consumer")
274            .field("capacity", &N)
275            .finish()
276    }
277}
278
279impl<T: Copy, const N: usize> crate::traits::Sink<T> for Producer<'_, T, N> {
280    type Error = T;
281
282    #[inline]
283    fn try_push(&mut self, val: T) -> Result<(), T> {
284        self.push(val)
285    }
286}
287
288impl<T: Copy, const N: usize> crate::traits::Source<T> for Consumer<'_, T, N> {
289    #[inline]
290    fn try_pop(&mut self) -> Option<T> {
291        self.pop()
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn new_buf_is_empty() {
301        let buf = EventBuf::<u32, 4>::new();
302        assert!(buf.is_empty());
303        assert!(!buf.is_full());
304        assert_eq!(buf.len(), 0);
305        assert_eq!(buf.capacity(), 4);
306    }
307
308    #[test]
309    fn push_and_pop_fifo() {
310        let buf = EventBuf::<u32, 4>::new();
311        let p = buf.producer();
312        let c = buf.consumer();
313
314        assert!(p.push(10).is_ok());
315        assert!(p.push(20).is_ok());
316        assert!(p.push(30).is_ok());
317
318        assert_eq!(c.pop(), Some(10));
319        assert_eq!(c.pop(), Some(20));
320        assert_eq!(c.pop(), Some(30));
321        assert_eq!(c.pop(), None);
322    }
323
324    #[test]
325    fn push_rejects_when_full() {
326        let buf = EventBuf::<u32, 2>::new();
327        let p = buf.producer();
328        let c = buf.consumer();
329
330        assert!(p.push(1).is_ok());
331        assert!(p.push(2).is_ok());
332        assert_eq!(p.push(3), Err(3)); // full — value returned
333
334        // drain one, then push succeeds
335        assert_eq!(c.pop(), Some(1));
336        assert!(p.push(3).is_ok());
337    }
338
339    #[test]
340    fn drain_returns_count() {
341        let buf = EventBuf::<u32, 8>::new();
342        let p = buf.producer();
343        let c = buf.consumer();
344
345        for i in 0..5 {
346            p.push(i).unwrap();
347        }
348
349        let mut out = std::vec::Vec::new();
350        let n = c.drain(3, |v| out.push(v));
351        assert_eq!(n, 3);
352        assert_eq!(out, [0, 1, 2]);
353
354        // remaining
355        let n = c.drain(100, |v| out.push(v));
356        assert_eq!(n, 2);
357        assert_eq!(out, [0, 1, 2, 3, 4]);
358    }
359
360    #[test]
361    fn drain_on_empty_returns_zero() {
362        let buf = EventBuf::<u32, 4>::new();
363        let _p = buf.producer();
364        let c = buf.consumer();
365
366        let n = c.drain(10, |_| panic!("should not be called"));
367        assert_eq!(n, 0);
368    }
369
370    #[test]
371    fn producer_consumer_can_be_recreated() {
372        let buf = EventBuf::<u32, 4>::new();
373        {
374            let p = buf.producer();
375            p.push(1).unwrap();
376        }
377        // producer dropped — can create a new one
378        let p = buf.producer();
379        p.push(2).unwrap();
380
381        {
382            let c = buf.consumer();
383            assert_eq!(c.pop(), Some(1));
384        }
385        // consumer dropped — can create a new one
386        let c = buf.consumer();
387        assert_eq!(c.pop(), Some(2));
388        assert_eq!(c.pop(), None);
389    }
390
391    #[test]
392    #[should_panic(expected = "only one Producer")]
393    fn double_producer_panics() {
394        let buf = EventBuf::<u32, 4>::new();
395        let _p1 = buf.producer();
396        let _p2 = buf.producer();
397    }
398
399    #[test]
400    #[should_panic(expected = "only one Consumer")]
401    fn double_consumer_panics() {
402        let buf = EventBuf::<u32, 4>::new();
403        let _c1 = buf.consumer();
404        let _c2 = buf.consumer();
405    }
406
407    #[test]
408    fn wraps_around_correctly() {
409        let buf = EventBuf::<u32, 3>::new();
410        let p = buf.producer();
411        let c = buf.consumer();
412
413        // fill, drain, fill again — exercises the wrap
414        for round in 0u32..4 {
415            let base = round * 3;
416            for i in 0..3 {
417                assert!(p.push(base + i).is_ok());
418            }
419            assert_eq!(p.push(99), Err(99)); // full
420            for i in 0..3 {
421                assert_eq!(c.pop(), Some(base + i));
422            }
423            assert_eq!(c.pop(), None); // empty
424        }
425    }
426
427    #[test]
428    fn default_is_new() {
429        let buf: EventBuf<u8, 4> = EventBuf::default();
430        assert!(buf.is_empty());
431    }
432
433    #[test]
434    fn len_and_full_track_state() {
435        let buf = EventBuf::<u32, 3>::new();
436        let p = buf.producer();
437        let c = buf.consumer();
438
439        assert_eq!(buf.len(), 0);
440        assert!(buf.is_empty());
441
442        p.push(1).unwrap();
443        assert_eq!(buf.len(), 1);
444
445        p.push(2).unwrap();
446        p.push(3).unwrap();
447        assert_eq!(buf.len(), 3);
448        assert!(buf.is_full());
449
450        c.pop();
451        assert_eq!(buf.len(), 2);
452        assert!(!buf.is_full());
453    }
454
455    #[test]
456    fn handles_are_send() {
457        fn assert_send<T: Send>() {}
458        assert_send::<super::Producer<'_, u32, 4>>();
459        assert_send::<super::Consumer<'_, u32, 4>>();
460    }
461}