1use core::cell::{Cell, UnsafeCell};
40use core::marker::PhantomData;
41use core::mem::MaybeUninit;
42use core::sync::atomic::Ordering;
43#[cfg(target_has_atomic = "32")]
44use core::sync::atomic::{AtomicBool, AtomicU32};
45#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
46use portable_atomic::{AtomicBool, AtomicU32};
47
48fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
49 core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
50}
51
52pub struct EventBuf<T: Copy, const N: usize> {
63 head: AtomicU32,
64 tail: AtomicU32,
65 slots: [UnsafeCell<MaybeUninit<T>>; N],
66 producer_taken: AtomicBool,
67 consumer_taken: AtomicBool,
68}
69
70unsafe impl<T: Copy + Send, const N: usize> Sync for EventBuf<T, N> {}
75
76impl<T: Copy, const N: usize> EventBuf<T, N> {
77 pub fn new() -> Self {
82 assert!(N > 0, "EventBuf capacity N must be > 0");
83 Self {
84 head: AtomicU32::new(0),
85 tail: AtomicU32::new(0),
86 slots: unsafe_cell_array::<T, N>(),
87 producer_taken: AtomicBool::new(false),
88 consumer_taken: AtomicBool::new(false),
89 }
90 }
91
92 #[inline(always)]
93 const fn slot_index(pos: u32) -> usize {
94 (pos as usize) % N
95 }
96
97 #[inline]
99 pub const fn capacity(&self) -> usize {
100 N
101 }
102
103 #[inline]
108 pub fn len(&self) -> usize {
109 let h = self.head.load(Ordering::Relaxed);
110 let t = self.tail.load(Ordering::Relaxed);
111 h.wrapping_sub(t) as usize
112 }
113
114 #[inline]
116 pub fn is_empty(&self) -> bool {
117 self.len() == 0
118 }
119
120 #[inline]
122 pub fn is_full(&self) -> bool {
123 self.len() >= N
124 }
125
126 #[inline]
131 pub fn producer(&self) -> Producer<'_, T, N> {
132 assert!(
133 !self.producer_taken.swap(true, Ordering::AcqRel),
134 "EventBuf: only one Producer may be active at a time"
135 );
136 Producer {
137 buf: self,
138 _not_sync: PhantomData,
139 }
140 }
141
142 #[inline]
147 pub fn consumer(&self) -> Consumer<'_, T, N> {
148 assert!(
149 !self.consumer_taken.swap(true, Ordering::AcqRel),
150 "EventBuf: only one Consumer may be active at a time"
151 );
152 Consumer {
153 buf: self,
154 _not_sync: PhantomData,
155 }
156 }
157}
158
159impl<T: Copy, const N: usize> Default for EventBuf<T, N> {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl<T: Copy, const N: usize> core::fmt::Debug for EventBuf<T, N> {
166 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
167 f.debug_struct("EventBuf")
168 .field("len", &self.len())
169 .field("capacity", &N)
170 .finish()
171 }
172}
173
174pub struct Producer<'a, T: Copy, const N: usize> {
178 buf: &'a EventBuf<T, N>,
179 _not_sync: PhantomData<Cell<()>>,
180}
181
182impl<T: Copy, const N: usize> Producer<'_, T, N> {
183 #[inline]
188 pub fn push(&self, val: T) -> Result<(), T> {
189 let head = self.buf.head.load(Ordering::Relaxed);
190 let tail = self.buf.tail.load(Ordering::Acquire);
191 if head.wrapping_sub(tail) as usize >= N {
192 return Err(val);
193 }
194 let idx = EventBuf::<T, N>::slot_index(head);
195 unsafe {
198 (*self.buf.slots[idx].get()).write(val);
199 }
200 self.buf.head.store(head.wrapping_add(1), Ordering::Release);
201 Ok(())
202 }
203}
204
205impl<T: Copy, const N: usize> Drop for Producer<'_, T, N> {
206 fn drop(&mut self) {
207 self.buf.producer_taken.store(false, Ordering::Release);
208 }
209}
210
211impl<T: Copy, const N: usize> core::fmt::Debug for Producer<'_, T, N> {
212 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
213 f.debug_struct("event_buf::Producer")
214 .field("capacity", &N)
215 .finish()
216 }
217}
218
219pub struct Consumer<'a, T: Copy, const N: usize> {
223 buf: &'a EventBuf<T, N>,
224 _not_sync: PhantomData<Cell<()>>,
225}
226
227impl<T: Copy, const N: usize> Consumer<'_, T, N> {
228 #[inline]
232 pub fn pop(&self) -> Option<T> {
233 let tail = self.buf.tail.load(Ordering::Relaxed);
234 let head = self.buf.head.load(Ordering::Acquire);
235 if tail == head {
236 return None;
237 }
238 let idx = EventBuf::<T, N>::slot_index(tail);
239 let val = unsafe { (*self.buf.slots[idx].get()).assume_init_read() };
242 self.buf.tail.store(tail.wrapping_add(1), Ordering::Release);
243 Some(val)
244 }
245
246 #[inline]
250 pub fn drain(&self, max: usize, mut hook: impl FnMut(T)) -> usize {
251 let mut count = 0;
252 while count < max {
253 match self.pop() {
254 Some(val) => {
255 hook(val);
256 count += 1;
257 }
258 None => break,
259 }
260 }
261 count
262 }
263}
264
265impl<T: Copy, const N: usize> Drop for Consumer<'_, T, N> {
266 fn drop(&mut self) {
267 self.buf.consumer_taken.store(false, Ordering::Release);
268 }
269}
270
271impl<T: Copy, const N: usize> core::fmt::Debug for Consumer<'_, T, N> {
272 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
273 f.debug_struct("event_buf::Consumer")
274 .field("capacity", &N)
275 .finish()
276 }
277}
278
279impl<T: Copy, const N: usize> crate::traits::Sink<T> for Producer<'_, T, N> {
280 type Error = T;
281
282 #[inline]
283 fn try_push(&mut self, val: T) -> Result<(), T> {
284 self.push(val)
285 }
286}
287
288impl<T: Copy, const N: usize> crate::traits::Source<T> for Consumer<'_, T, N> {
289 #[inline]
290 fn try_pop(&mut self) -> Option<T> {
291 self.pop()
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298
299 #[test]
300 fn new_buf_is_empty() {
301 let buf = EventBuf::<u32, 4>::new();
302 assert!(buf.is_empty());
303 assert!(!buf.is_full());
304 assert_eq!(buf.len(), 0);
305 assert_eq!(buf.capacity(), 4);
306 }
307
308 #[test]
309 fn push_and_pop_fifo() {
310 let buf = EventBuf::<u32, 4>::new();
311 let p = buf.producer();
312 let c = buf.consumer();
313
314 assert!(p.push(10).is_ok());
315 assert!(p.push(20).is_ok());
316 assert!(p.push(30).is_ok());
317
318 assert_eq!(c.pop(), Some(10));
319 assert_eq!(c.pop(), Some(20));
320 assert_eq!(c.pop(), Some(30));
321 assert_eq!(c.pop(), None);
322 }
323
324 #[test]
325 fn push_rejects_when_full() {
326 let buf = EventBuf::<u32, 2>::new();
327 let p = buf.producer();
328 let c = buf.consumer();
329
330 assert!(p.push(1).is_ok());
331 assert!(p.push(2).is_ok());
332 assert_eq!(p.push(3), Err(3)); assert_eq!(c.pop(), Some(1));
336 assert!(p.push(3).is_ok());
337 }
338
339 #[test]
340 fn drain_returns_count() {
341 let buf = EventBuf::<u32, 8>::new();
342 let p = buf.producer();
343 let c = buf.consumer();
344
345 for i in 0..5 {
346 p.push(i).unwrap();
347 }
348
349 let mut out = std::vec::Vec::new();
350 let n = c.drain(3, |v| out.push(v));
351 assert_eq!(n, 3);
352 assert_eq!(out, [0, 1, 2]);
353
354 let n = c.drain(100, |v| out.push(v));
356 assert_eq!(n, 2);
357 assert_eq!(out, [0, 1, 2, 3, 4]);
358 }
359
360 #[test]
361 fn drain_on_empty_returns_zero() {
362 let buf = EventBuf::<u32, 4>::new();
363 let _p = buf.producer();
364 let c = buf.consumer();
365
366 let n = c.drain(10, |_| panic!("should not be called"));
367 assert_eq!(n, 0);
368 }
369
370 #[test]
371 fn producer_consumer_can_be_recreated() {
372 let buf = EventBuf::<u32, 4>::new();
373 {
374 let p = buf.producer();
375 p.push(1).unwrap();
376 }
377 let p = buf.producer();
379 p.push(2).unwrap();
380
381 {
382 let c = buf.consumer();
383 assert_eq!(c.pop(), Some(1));
384 }
385 let c = buf.consumer();
387 assert_eq!(c.pop(), Some(2));
388 assert_eq!(c.pop(), None);
389 }
390
391 #[test]
392 #[should_panic(expected = "only one Producer")]
393 fn double_producer_panics() {
394 let buf = EventBuf::<u32, 4>::new();
395 let _p1 = buf.producer();
396 let _p2 = buf.producer();
397 }
398
399 #[test]
400 #[should_panic(expected = "only one Consumer")]
401 fn double_consumer_panics() {
402 let buf = EventBuf::<u32, 4>::new();
403 let _c1 = buf.consumer();
404 let _c2 = buf.consumer();
405 }
406
407 #[test]
408 fn wraps_around_correctly() {
409 let buf = EventBuf::<u32, 3>::new();
410 let p = buf.producer();
411 let c = buf.consumer();
412
413 for round in 0u32..4 {
415 let base = round * 3;
416 for i in 0..3 {
417 assert!(p.push(base + i).is_ok());
418 }
419 assert_eq!(p.push(99), Err(99)); for i in 0..3 {
421 assert_eq!(c.pop(), Some(base + i));
422 }
423 assert_eq!(c.pop(), None); }
425 }
426
427 #[test]
428 fn default_is_new() {
429 let buf: EventBuf<u8, 4> = EventBuf::default();
430 assert!(buf.is_empty());
431 }
432
433 #[test]
434 fn len_and_full_track_state() {
435 let buf = EventBuf::<u32, 3>::new();
436 let p = buf.producer();
437 let c = buf.consumer();
438
439 assert_eq!(buf.len(), 0);
440 assert!(buf.is_empty());
441
442 p.push(1).unwrap();
443 assert_eq!(buf.len(), 1);
444
445 p.push(2).unwrap();
446 p.push(3).unwrap();
447 assert_eq!(buf.len(), 3);
448 assert!(buf.is_full());
449
450 c.pop();
451 assert_eq!(buf.len(), 2);
452 assert!(!buf.is_full());
453 }
454
455 #[test]
456 fn handles_are_send() {
457 fn assert_send<T: Send>() {}
458 assert_send::<super::Producer<'_, u32, 4>>();
459 assert_send::<super::Consumer<'_, u32, 4>>();
460 }
461}