1use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22#[cfg(test)]
23use core::sync::atomic::AtomicUsize;
24use core::sync::atomic::Ordering;
25#[cfg(target_has_atomic = "32")]
26use core::sync::atomic::{AtomicBool, AtomicU32};
27#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
28use portable_atomic::{AtomicBool, AtomicU32};
29
30fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
31 core::array::from_fn(|_| AtomicU32::new(init))
32}
33
34fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
35 core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
36}
37
38#[cfg(test)]
39static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
40#[cfg(test)]
41static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
42
43#[must_use]
44#[derive(Copy, Clone, Debug)]
45pub struct PollStats {
46 pub read: usize,
48 pub dropped: usize,
50 pub newest: u32,
52}
53
54pub struct SeqRing<T: Copy, const N: usize> {
57 next_seq: AtomicU32,
58 published_seq: AtomicU32,
59 slot_seq: [AtomicU32; N],
60 slots: [UnsafeCell<MaybeUninit<T>>; N],
61 producer_taken: AtomicBool,
62 consumer_taken: AtomicBool,
63}
64
65unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
70
71impl<T: Copy, const N: usize> SeqRing<T, N> {
72 pub fn new() -> Self {
77 assert!(N > 0);
78 Self {
79 next_seq: AtomicU32::new(0),
80 published_seq: AtomicU32::new(0),
81 slot_seq: atomic_u32_array::<N>(0),
82 slots: unsafe_cell_array::<T, N>(),
83 producer_taken: AtomicBool::new(false),
84 consumer_taken: AtomicBool::new(false),
85 }
86 }
87
88 #[inline(always)]
89 const fn idx_for(seq: u32) -> usize {
90 ((seq.wrapping_sub(1)) as usize) % N
91 }
92
93 #[inline]
98 pub fn producer(&self) -> Producer<'_, T, N> {
99 assert!(
100 !self.producer_taken.swap(true, Ordering::AcqRel),
101 "SeqRing::producer() called while a producer is active"
102 );
103 Producer {
104 ring: self,
105 _not_sync: PhantomData,
106 }
107 }
108
109 #[inline]
114 pub fn consumer(&self) -> Consumer<'_, T, N> {
115 assert!(
116 !self.consumer_taken.swap(true, Ordering::AcqRel),
117 "SeqRing::consumer() called while a consumer is active"
118 );
119 Consumer {
120 ring: self,
121 last_seq: 0,
122 dropped_accum: 0,
123 _not_sync: PhantomData,
124 }
125 }
126
127 #[inline]
128 fn newest_seq(&self) -> u32 {
129 self.published_seq.load(Ordering::Acquire)
130 }
131
132 #[inline]
133 fn push_inner(&self, value: T) -> u32 {
134 let mut seq = self
135 .next_seq
136 .fetch_add(1, Ordering::Relaxed)
137 .wrapping_add(1);
138 if seq == 0 {
139 seq = 1;
140 self.next_seq.store(1, Ordering::Relaxed);
141 }
142
143 let idx = Self::idx_for(seq);
144 unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
145
146 self.slot_seq[idx].store(seq, Ordering::Release);
147 self.published_seq.store(seq, Ordering::Release);
148 seq
149 }
150
151 #[inline]
152 fn read_seq_inner(&self, seq: u32) -> Option<T> {
153 let idx = Self::idx_for(seq);
154
155 let s1 = self.slot_seq[idx].load(Ordering::Acquire);
156 if s1 != seq {
157 return None;
158 }
159
160 let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
161
162 #[cfg(test)]
163 self.test_after_read_hook(idx);
164
165 let s2 = self.slot_seq[idx].load(Ordering::Acquire);
166 if s2 != seq {
167 return None;
168 }
169
170 Some(v)
171 }
172
173 #[cfg(test)]
174 fn test_after_read_hook(&self, idx: usize) {
175 let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
176 if target == self as *const _ as usize {
177 let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
178 self.slot_seq[idx].store(seq, Ordering::Release);
179 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
180 }
181 }
182}
183
184impl<T: Copy, const N: usize> Default for SeqRing<T, N> {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190pub struct Producer<'a, T: Copy, const N: usize> {
194 ring: &'a SeqRing<T, N>,
195 _not_sync: PhantomData<Cell<()>>,
196}
197
198impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
199 #[inline]
203 pub fn push(&self, value: T) -> u32 {
204 self.ring.push_inner(value)
205 }
206}
207
208impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
209 fn drop(&mut self) {
210 self.ring.producer_taken.store(false, Ordering::Release);
211 }
212}
213
214pub struct Consumer<'a, T: Copy, const N: usize> {
218 ring: &'a SeqRing<T, N>,
219 last_seq: u32,
220 dropped_accum: usize,
221 _not_sync: PhantomData<Cell<()>>,
222}
223
224impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
225 #[inline]
227 pub fn dropped(&self) -> usize {
228 self.dropped_accum
229 }
230
231 #[inline]
233 pub fn reset_dropped(&mut self) {
234 self.dropped_accum = 0;
235 }
236
237 #[inline]
240 pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
241 let mut hook = Some(hook);
242 let stats = self.poll_up_to(1, |seq, v| {
243 if let Some(hook) = hook.take() {
244 hook(seq, v);
245 }
246 });
247 stats.read == 1
248 }
249
250 pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
256 if max == 0 {
257 return PollStats {
258 read: 0,
259 dropped: 0,
260 newest: self.ring.newest_seq(),
261 };
262 }
263
264 let mut newest = self.ring.newest_seq();
265 if newest == 0 || newest == self.last_seq {
266 return PollStats {
267 read: 0,
268 dropped: 0,
269 newest,
270 };
271 }
272
273 let mut read = 0usize;
274 let mut dropped = 0usize;
275
276 while read < max {
277 newest = self.ring.newest_seq();
278 if self.last_seq == newest {
279 break;
280 }
281
282 let lag = newest.wrapping_sub(self.last_seq) as usize;
283 if lag > N {
284 let next = self.last_seq.wrapping_add(1);
285 let keep_from = newest.wrapping_sub((N - 1) as u32);
286 let jump_drops = keep_from.wrapping_sub(next) as usize;
287 dropped += jump_drops;
288 self.last_seq = keep_from.wrapping_sub(1);
289 continue;
290 }
291
292 let next = self.last_seq.wrapping_add(1);
293
294 match self.ring.read_seq_inner(next) {
295 Some(v) => {
296 hook(next, &v);
297 self.last_seq = next;
298 read += 1;
299 }
300 None => {
301 self.last_seq = next;
302 dropped += 1;
303 }
304 }
305 }
306
307 self.dropped_accum += dropped;
308
309 PollStats {
310 read,
311 dropped,
312 newest,
313 }
314 }
315
316 #[inline]
321 pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
322 let newest = self.ring.newest_seq();
323 if newest == 0 {
324 return false;
325 }
326 if let Some(v) = self.ring.read_seq_inner(newest) {
327 hook(newest, &v);
328 true
329 } else {
330 false
331 }
332 }
333
334 #[inline]
339 pub fn skip_to_latest(&mut self) {
340 let newest = self.ring.newest_seq();
341 if newest != 0 {
342 self.last_seq = newest.wrapping_sub(1);
343 }
344 }
345}
346
347impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
348 fn drop(&mut self) {
349 self.ring.consumer_taken.store(false, Ordering::Release);
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
356 use core::sync::atomic::Ordering;
357 use std::vec::Vec;
358
359 #[test]
360 fn poll_one_empty_returns_false() {
361 let ring = SeqRing::<u32, 4>::new();
362 let mut consumer = ring.consumer();
363 let ok = consumer.poll_one(|_, _| {});
364 assert!(!ok);
365 }
366
367 #[test]
368 fn polls_in_order() {
369 let ring = SeqRing::<u32, 8>::new();
370 let producer = ring.producer();
371 let mut consumer = ring.consumer();
372
373 producer.push(10);
374 producer.push(11);
375 producer.push(12);
376
377 let mut seen = Vec::new();
378 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
379
380 assert_eq!(stats.read, 3);
381 assert_eq!(stats.dropped, 0);
382 assert_eq!(stats.newest, 3);
383 assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
384 }
385
386 #[test]
387 fn drops_when_consumer_lags() {
388 let ring = SeqRing::<u32, 4>::new();
389 let producer = ring.producer();
390 let mut consumer = ring.consumer();
391
392 for i in 0..10 {
393 producer.push(i);
394 }
395
396 let mut seen = Vec::new();
397 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
398
399 assert_eq!(stats.read, 4);
400 assert_eq!(stats.dropped, 6);
401 assert_eq!(stats.newest, 10);
402 assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
403 }
404
405 #[test]
406 fn latest_reads_newest() {
407 let ring = SeqRing::<u32, 8>::new();
408 let producer = ring.producer();
409 let consumer = ring.consumer();
410
411 producer.push(1);
412 producer.push(2);
413
414 let mut got = None;
415 let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
416
417 assert!(ok);
418 assert_eq!(got, Some((2, 2)));
419 }
420
421 #[test]
422 fn skip_to_latest_makes_next_poll_latest() {
423 let ring = SeqRing::<u32, 8>::new();
424 let producer = ring.producer();
425 let mut consumer = ring.consumer();
426
427 producer.push(10);
428 producer.push(11);
429 producer.push(12);
430
431 consumer.skip_to_latest();
432
433 let mut got = None;
434 let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
435
436 assert!(ok);
437 assert_eq!(got, Some((3, 12)));
438 }
439
440 #[test]
441 fn poll_up_to_zero_returns_newest_only() {
442 let ring = SeqRing::<u32, 4>::new();
443 let producer = ring.producer();
444 let mut consumer = ring.consumer();
445
446 producer.push(42);
447
448 let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
449
450 assert_eq!(stats.read, 0);
451 assert_eq!(stats.dropped, 0);
452 assert_eq!(stats.newest, 1);
453 }
454
455 #[test]
456 fn dropped_counter_can_reset() {
457 let ring = SeqRing::<u32, 2>::new();
458 let producer = ring.producer();
459 let mut consumer = ring.consumer();
460
461 for i in 0..5 {
462 producer.push(i);
463 }
464
465 let stats = consumer.poll_up_to(10, |_, _| {});
466
467 assert_eq!(consumer.dropped(), stats.dropped);
468
469 consumer.reset_dropped();
470
471 assert_eq!(consumer.dropped(), 0);
472 }
473
474 #[test]
475 fn latest_empty_returns_false() {
476 let ring = SeqRing::<u32, 4>::new();
477 let consumer = ring.consumer();
478
479 let ok = consumer.latest(|_, _| {});
480
481 assert!(!ok);
482 }
483
484 #[test]
485 fn latest_returns_false_when_slot_missing() {
486 let ring = SeqRing::<u32, 4>::new();
487 let consumer = ring.consumer();
488
489 ring.published_seq.store(1, Ordering::Release);
490
491 let ok = consumer.latest(|_, _| {});
492
493 assert!(!ok);
494 }
495
496 #[test]
497 fn poll_up_to_counts_dropped_when_slot_missing() {
498 let ring = SeqRing::<u32, 4>::new();
499 let mut consumer = ring.consumer();
500
501 ring.published_seq.store(1, Ordering::Release);
502
503 let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
504
505 assert_eq!(stats.read, 0);
506 assert_eq!(stats.dropped, 1);
507 assert_eq!(consumer.dropped(), 1);
508 }
509
510 #[test]
511 fn read_seq_inner_detects_overwrite_during_read() {
512 let ring = SeqRing::<u32, 4>::new();
513 let producer = ring.producer();
514 let seq = producer.push(7);
515
516 TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
517 TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
518
519 let got = ring.read_seq_inner(seq);
520
521 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
522
523 assert!(got.is_none());
524 }
525
526 #[test]
527 fn push_wraps_seq_from_zero_to_one() {
528 let ring = SeqRing::<u32, 4>::new();
529
530 ring.next_seq.store(u32::MAX, Ordering::Relaxed);
531
532 let seq = ring.producer().push(1);
533
534 assert_eq!(seq, 1);
535 assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
536 }
537}