1use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22#[cfg(test)]
23use core::sync::atomic::AtomicUsize;
24use core::sync::atomic::Ordering;
25#[cfg(target_has_atomic = "32")]
26use core::sync::atomic::{AtomicBool, AtomicU32};
27#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
28use portable_atomic::{AtomicBool, AtomicU32};
29
30fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
31 core::array::from_fn(|_| AtomicU32::new(init))
32}
33
34fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
35 core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
36}
37
38#[cfg(test)]
39static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
40#[cfg(test)]
41static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
42
43#[must_use]
44#[derive(Copy, Clone, Debug)]
45pub struct PollStats {
46 pub read: usize,
48 pub dropped: usize,
50 pub newest: u32,
52}
53
54pub struct SeqRing<T: Copy, const N: usize> {
57 next_seq: AtomicU32,
58 published_seq: AtomicU32,
59 slot_seq: [AtomicU32; N],
60 slots: [UnsafeCell<MaybeUninit<T>>; N],
61 producer_taken: AtomicBool,
62 consumer_taken: AtomicBool,
63}
64
65unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
70
71impl<T: Copy, const N: usize> SeqRing<T, N> {
72 pub fn new() -> Self {
77 assert!(N > 0);
78 Self {
79 next_seq: AtomicU32::new(0),
80 published_seq: AtomicU32::new(0),
81 slot_seq: atomic_u32_array::<N>(0),
82 slots: unsafe_cell_array::<T, N>(),
83 producer_taken: AtomicBool::new(false),
84 consumer_taken: AtomicBool::new(false),
85 }
86 }
87
88 #[inline]
90 pub const fn capacity(&self) -> usize {
91 N
92 }
93
94 #[inline(always)]
95 const fn idx_for(seq: u32) -> usize {
96 ((seq.wrapping_sub(1)) as usize) % N
97 }
98
99 #[inline]
104 pub fn producer(&self) -> Producer<'_, T, N> {
105 assert!(
106 !self.producer_taken.swap(true, Ordering::AcqRel),
107 "SeqRing::producer() called while a producer is active"
108 );
109 Producer {
110 ring: self,
111 _not_sync: PhantomData,
112 }
113 }
114
115 #[inline]
120 pub fn consumer(&self) -> Consumer<'_, T, N> {
121 assert!(
122 !self.consumer_taken.swap(true, Ordering::AcqRel),
123 "SeqRing::consumer() called while a consumer is active"
124 );
125 Consumer {
126 ring: self,
127 last_seq: 0,
128 dropped_accum: 0,
129 _not_sync: PhantomData,
130 }
131 }
132
133 #[inline]
134 fn newest_seq(&self) -> u32 {
135 self.published_seq.load(Ordering::Acquire)
136 }
137
138 #[inline]
139 fn push_inner(&self, value: T) -> u32 {
140 let mut seq = self
141 .next_seq
142 .fetch_add(1, Ordering::Relaxed)
143 .wrapping_add(1);
144 if seq == 0 {
145 seq = 1;
146 self.next_seq.store(1, Ordering::Relaxed);
147 }
148
149 let idx = Self::idx_for(seq);
150 unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
151
152 self.slot_seq[idx].store(seq, Ordering::Release);
153 self.published_seq.store(seq, Ordering::Release);
154 seq
155 }
156
157 #[inline]
158 fn read_seq_inner(&self, seq: u32) -> Option<T> {
159 let idx = Self::idx_for(seq);
160
161 let s1 = self.slot_seq[idx].load(Ordering::Acquire);
162 if s1 != seq {
163 return None;
164 }
165
166 let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
167
168 #[cfg(test)]
169 self.test_after_read_hook(idx);
170
171 let s2 = self.slot_seq[idx].load(Ordering::Acquire);
172 if s2 != seq {
173 return None;
174 }
175
176 Some(v)
177 }
178
179 #[cfg(test)]
180 fn test_after_read_hook(&self, idx: usize) {
181 let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
182 if target == self as *const _ as usize {
183 let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
184 self.slot_seq[idx].store(seq, Ordering::Release);
185 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
186 }
187 }
188}
189
190impl<T: Copy, const N: usize> Default for SeqRing<T, N> {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl<T: Copy, const N: usize> core::fmt::Debug for SeqRing<T, N> {
197 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
198 f.debug_struct("SeqRing")
199 .field("capacity", &N)
200 .field("published_seq", &self.published_seq.load(Ordering::Relaxed))
201 .finish()
202 }
203}
204
205pub struct Producer<'a, T: Copy, const N: usize> {
209 ring: &'a SeqRing<T, N>,
210 _not_sync: PhantomData<Cell<()>>,
211}
212
213impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
214 #[inline]
218 pub fn push(&self, value: T) -> u32 {
219 self.ring.push_inner(value)
220 }
221}
222
223impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
224 fn drop(&mut self) {
225 self.ring.producer_taken.store(false, Ordering::Release);
226 }
227}
228
229impl<T: Copy, const N: usize> core::fmt::Debug for Producer<'_, T, N> {
230 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
231 f.debug_struct("seq_ring::Producer")
232 .field("capacity", &N)
233 .finish()
234 }
235}
236
237pub struct Consumer<'a, T: Copy, const N: usize> {
241 ring: &'a SeqRing<T, N>,
242 last_seq: u32,
243 dropped_accum: usize,
244 _not_sync: PhantomData<Cell<()>>,
245}
246
247impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
248 #[inline]
250 pub fn dropped(&self) -> usize {
251 self.dropped_accum
252 }
253
254 #[inline]
256 pub fn reset_dropped(&mut self) {
257 self.dropped_accum = 0;
258 }
259
260 #[inline]
263 pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
264 let mut hook = Some(hook);
265 let stats = self.poll_up_to(1, |seq, v| {
266 if let Some(hook) = hook.take() {
267 hook(seq, v);
268 }
269 });
270 stats.read == 1
271 }
272
273 pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
279 if max == 0 {
280 return PollStats {
281 read: 0,
282 dropped: 0,
283 newest: self.ring.newest_seq(),
284 };
285 }
286
287 let mut newest = self.ring.newest_seq();
288 if newest == 0 || newest == self.last_seq {
289 return PollStats {
290 read: 0,
291 dropped: 0,
292 newest,
293 };
294 }
295
296 let mut read = 0usize;
297 let mut dropped = 0usize;
298
299 while read < max {
300 newest = self.ring.newest_seq();
301 if self.last_seq == newest {
302 break;
303 }
304
305 let lag = newest.wrapping_sub(self.last_seq) as usize;
306 if lag > N {
307 let next = self.last_seq.wrapping_add(1);
308 let keep_from = newest.wrapping_sub((N - 1) as u32);
309 let jump_drops = keep_from.wrapping_sub(next) as usize;
310 dropped += jump_drops;
311 self.last_seq = keep_from.wrapping_sub(1);
312 continue;
313 }
314
315 let next = self.last_seq.wrapping_add(1);
316
317 match self.ring.read_seq_inner(next) {
318 Some(v) => {
319 hook(next, &v);
320 self.last_seq = next;
321 read += 1;
322 }
323 None => {
324 self.last_seq = next;
325 dropped += 1;
326 }
327 }
328 }
329
330 self.dropped_accum += dropped;
331
332 PollStats {
333 read,
334 dropped,
335 newest,
336 }
337 }
338
339 #[inline]
344 pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
345 let newest = self.ring.newest_seq();
346 if newest == 0 {
347 return false;
348 }
349 if let Some(v) = self.ring.read_seq_inner(newest) {
350 hook(newest, &v);
351 true
352 } else {
353 false
354 }
355 }
356
357 #[inline]
362 pub fn skip_to_latest(&mut self) {
363 let newest = self.ring.newest_seq();
364 if newest != 0 {
365 self.last_seq = newest.wrapping_sub(1);
366 }
367 }
368}
369
370impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
371 fn drop(&mut self) {
372 self.ring.consumer_taken.store(false, Ordering::Release);
373 }
374}
375
376impl<T: Copy, const N: usize> core::fmt::Debug for Consumer<'_, T, N> {
377 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
378 f.debug_struct("seq_ring::Consumer")
379 .field("capacity", &N)
380 .field("last_seq", &self.last_seq)
381 .field("dropped", &self.dropped_accum)
382 .finish()
383 }
384}
385
386impl<T: Copy, const N: usize> crate::traits::Sink<T> for Producer<'_, T, N> {
387 type Error = core::convert::Infallible;
388
389 #[inline]
390 fn try_push(&mut self, val: T) -> Result<(), core::convert::Infallible> {
391 self.push(val);
392 Ok(())
393 }
394}
395
396impl<T: Copy, const N: usize> crate::traits::Source<T> for Consumer<'_, T, N> {
397 #[inline]
398 fn try_pop(&mut self) -> Option<T> {
399 let mut result = None;
400 self.poll_one(|_seq, v| result = Some(*v));
401 result
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
408 use core::sync::atomic::Ordering;
409 use std::vec::Vec;
410
411 #[test]
412 fn poll_one_empty_returns_false() {
413 let ring = SeqRing::<u32, 4>::new();
414 let mut consumer = ring.consumer();
415 let ok = consumer.poll_one(|_, _| {});
416 assert!(!ok);
417 }
418
419 #[test]
420 fn polls_in_order() {
421 let ring = SeqRing::<u32, 8>::new();
422 let producer = ring.producer();
423 let mut consumer = ring.consumer();
424
425 producer.push(10);
426 producer.push(11);
427 producer.push(12);
428
429 let mut seen = Vec::new();
430 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
431
432 assert_eq!(stats.read, 3);
433 assert_eq!(stats.dropped, 0);
434 assert_eq!(stats.newest, 3);
435 assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
436 }
437
438 #[test]
439 fn drops_when_consumer_lags() {
440 let ring = SeqRing::<u32, 4>::new();
441 let producer = ring.producer();
442 let mut consumer = ring.consumer();
443
444 for i in 0..10 {
445 producer.push(i);
446 }
447
448 let mut seen = Vec::new();
449 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
450
451 assert_eq!(stats.read, 4);
452 assert_eq!(stats.dropped, 6);
453 assert_eq!(stats.newest, 10);
454 assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
455 }
456
457 #[test]
458 fn latest_reads_newest() {
459 let ring = SeqRing::<u32, 8>::new();
460 let producer = ring.producer();
461 let consumer = ring.consumer();
462
463 producer.push(1);
464 producer.push(2);
465
466 let mut got = None;
467 let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
468
469 assert!(ok);
470 assert_eq!(got, Some((2, 2)));
471 }
472
473 #[test]
474 fn skip_to_latest_makes_next_poll_latest() {
475 let ring = SeqRing::<u32, 8>::new();
476 let producer = ring.producer();
477 let mut consumer = ring.consumer();
478
479 producer.push(10);
480 producer.push(11);
481 producer.push(12);
482
483 consumer.skip_to_latest();
484
485 let mut got = None;
486 let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
487
488 assert!(ok);
489 assert_eq!(got, Some((3, 12)));
490 }
491
492 #[test]
493 fn poll_up_to_zero_returns_newest_only() {
494 let ring = SeqRing::<u32, 4>::new();
495 let producer = ring.producer();
496 let mut consumer = ring.consumer();
497
498 producer.push(42);
499
500 let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
501
502 assert_eq!(stats.read, 0);
503 assert_eq!(stats.dropped, 0);
504 assert_eq!(stats.newest, 1);
505 }
506
507 #[test]
508 fn dropped_counter_can_reset() {
509 let ring = SeqRing::<u32, 2>::new();
510 let producer = ring.producer();
511 let mut consumer = ring.consumer();
512
513 for i in 0..5 {
514 producer.push(i);
515 }
516
517 let stats = consumer.poll_up_to(10, |_, _| {});
518
519 assert_eq!(consumer.dropped(), stats.dropped);
520
521 consumer.reset_dropped();
522
523 assert_eq!(consumer.dropped(), 0);
524 }
525
526 #[test]
527 fn latest_empty_returns_false() {
528 let ring = SeqRing::<u32, 4>::new();
529 let consumer = ring.consumer();
530
531 let ok = consumer.latest(|_, _| {});
532
533 assert!(!ok);
534 }
535
536 #[test]
537 fn latest_returns_false_when_slot_missing() {
538 let ring = SeqRing::<u32, 4>::new();
539 let consumer = ring.consumer();
540
541 ring.published_seq.store(1, Ordering::Release);
542
543 let ok = consumer.latest(|_, _| {});
544
545 assert!(!ok);
546 }
547
548 #[test]
549 fn poll_up_to_counts_dropped_when_slot_missing() {
550 let ring = SeqRing::<u32, 4>::new();
551 let mut consumer = ring.consumer();
552
553 ring.published_seq.store(1, Ordering::Release);
554
555 let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
556
557 assert_eq!(stats.read, 0);
558 assert_eq!(stats.dropped, 1);
559 assert_eq!(consumer.dropped(), 1);
560 }
561
562 #[test]
563 fn read_seq_inner_detects_overwrite_during_read() {
564 let ring = SeqRing::<u32, 4>::new();
565 let producer = ring.producer();
566 let seq = producer.push(7);
567
568 TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
569 TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
570
571 let got = ring.read_seq_inner(seq);
572
573 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
574
575 assert!(got.is_none());
576 }
577
578 #[test]
579 fn push_wraps_seq_from_zero_to_one() {
580 let ring = SeqRing::<u32, 4>::new();
581
582 ring.next_seq.store(u32::MAX, Ordering::Relaxed);
583
584 let seq = ring.producer().push(1);
585
586 assert_eq!(seq, 1);
587 assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
588 }
589
590 #[test]
591 fn capacity_returns_n() {
592 let ring = SeqRing::<u32, 8>::new();
593 assert_eq!(ring.capacity(), 8);
594 }
595}