1use core::cell::{Cell, UnsafeCell};
20use core::marker::PhantomData;
21use core::mem::MaybeUninit;
22use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
23#[cfg(test)]
24use core::sync::atomic::AtomicUsize;
25
26fn atomic_u32_array<const N: usize>(init: u32) -> [AtomicU32; N] {
27 core::array::from_fn(|_| AtomicU32::new(init))
28}
29
30fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
31 core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
32}
33
34#[cfg(test)]
35static TEST_AFTER_READ_TARGET: AtomicUsize = AtomicUsize::new(0);
36#[cfg(test)]
37static TEST_AFTER_READ_SEQ: AtomicU32 = AtomicU32::new(0);
38
39#[must_use]
40#[derive(Copy, Clone, Debug)]
41pub struct PollStats {
42 pub read: usize,
44 pub dropped: usize,
46 pub newest: u32,
48}
49
50pub struct SeqRing<T: Copy, const N: usize> {
53 next_seq: AtomicU32,
54 published_seq: AtomicU32,
55 slot_seq: [AtomicU32; N],
56 slots: [UnsafeCell<MaybeUninit<T>>; N],
57 producer_taken: AtomicBool,
58 consumer_taken: AtomicBool,
59}
60
61unsafe impl<T: Copy + Send, const N: usize> Sync for SeqRing<T, N> {}
66
67impl<T: Copy, const N: usize> SeqRing<T, N> {
68 pub fn new() -> Self {
73 assert!(N > 0);
74 Self {
75 next_seq: AtomicU32::new(0),
76 published_seq: AtomicU32::new(0),
77 slot_seq: atomic_u32_array::<N>(0),
78 slots: unsafe_cell_array::<T, N>(),
79 producer_taken: AtomicBool::new(false),
80 consumer_taken: AtomicBool::new(false),
81 }
82 }
83
84 #[inline(always)]
85 const fn idx_for(seq: u32) -> usize {
86 ((seq.wrapping_sub(1)) as usize) % N
87 }
88
89 #[inline]
94 pub fn producer(&self) -> Producer<'_, T, N> {
95 assert!(
96 !self.producer_taken.swap(true, Ordering::AcqRel),
97 "SeqRing::producer() called while a producer is active"
98 );
99 Producer {
100 ring: self,
101 _not_sync: PhantomData,
102 }
103 }
104
105 #[inline]
110 pub fn consumer(&self) -> Consumer<'_, T, N> {
111 assert!(
112 !self.consumer_taken.swap(true, Ordering::AcqRel),
113 "SeqRing::consumer() called while a consumer is active"
114 );
115 Consumer {
116 ring: self,
117 last_seq: 0,
118 dropped_accum: 0,
119 _not_sync: PhantomData,
120 }
121 }
122
123 #[inline]
124 fn newest_seq(&self) -> u32 {
125 self.published_seq.load(Ordering::Acquire)
126 }
127
128 #[inline]
129 fn push_inner(&self, value: T) -> u32 {
130 let mut seq = self
131 .next_seq
132 .fetch_add(1, Ordering::Relaxed)
133 .wrapping_add(1);
134 if seq == 0 {
135 seq = 1;
136 self.next_seq.store(1, Ordering::Relaxed);
137 }
138
139 let idx = Self::idx_for(seq);
140 unsafe { (*self.slots[idx].get()).as_mut_ptr().write(value) };
141
142 self.slot_seq[idx].store(seq, Ordering::Release);
143 self.published_seq.store(seq, Ordering::Release);
144 seq
145 }
146
147 #[inline]
148 fn read_seq_inner(&self, seq: u32) -> Option<T> {
149 let idx = Self::idx_for(seq);
150
151 let s1 = self.slot_seq[idx].load(Ordering::Acquire);
152 if s1 != seq {
153 return None;
154 }
155
156 let v = unsafe { (*self.slots[idx].get()).assume_init_read() };
157
158 #[cfg(test)]
159 self.test_after_read_hook(idx);
160
161 let s2 = self.slot_seq[idx].load(Ordering::Acquire);
162 if s2 != seq {
163 return None;
164 }
165
166 Some(v)
167 }
168
169 #[cfg(test)]
170 fn test_after_read_hook(&self, idx: usize) {
171 let target = TEST_AFTER_READ_TARGET.load(Ordering::Acquire);
172 if target == self as *const _ as usize {
173 let seq = TEST_AFTER_READ_SEQ.load(Ordering::Relaxed);
174 self.slot_seq[idx].store(seq, Ordering::Release);
175 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
176 }
177 }
178}
179
180pub struct Producer<'a, T: Copy, const N: usize> {
184 ring: &'a SeqRing<T, N>,
185 _not_sync: PhantomData<Cell<()>>,
186}
187
188impl<'a, T: Copy, const N: usize> Producer<'a, T, N> {
189 #[inline]
193 pub fn push(&self, value: T) -> u32 {
194 self.ring.push_inner(value)
195 }
196}
197
198impl<'a, T: Copy, const N: usize> Drop for Producer<'a, T, N> {
199 fn drop(&mut self) {
200 self.ring.producer_taken.store(false, Ordering::Release);
201 }
202}
203
204pub struct Consumer<'a, T: Copy, const N: usize> {
208 ring: &'a SeqRing<T, N>,
209 last_seq: u32,
210 dropped_accum: usize,
211 _not_sync: PhantomData<Cell<()>>,
212}
213
214impl<'a, T: Copy, const N: usize> Consumer<'a, T, N> {
215 #[inline]
217 pub fn dropped(&self) -> usize {
218 self.dropped_accum
219 }
220
221 #[inline]
223 pub fn reset_dropped(&mut self) {
224 self.dropped_accum = 0;
225 }
226
227 #[inline]
230 pub fn poll_one(&mut self, hook: impl FnOnce(u32, &T)) -> bool {
231 let mut hook = Some(hook);
232 let stats = self.poll_up_to(1, |seq, v| {
233 if let Some(hook) = hook.take() {
234 hook(seq, v);
235 }
236 });
237 stats.read == 1
238 }
239
240 pub fn poll_up_to(&mut self, max: usize, mut hook: impl FnMut(u32, &T)) -> PollStats {
246 if max == 0 {
247 return PollStats {
248 read: 0,
249 dropped: 0,
250 newest: self.ring.newest_seq(),
251 };
252 }
253
254 let mut newest = self.ring.newest_seq();
255 if newest == 0 || newest == self.last_seq {
256 return PollStats {
257 read: 0,
258 dropped: 0,
259 newest,
260 };
261 }
262
263 let mut read = 0usize;
264 let mut dropped = 0usize;
265
266 while read < max {
267 newest = self.ring.newest_seq();
268 if self.last_seq == newest {
269 break;
270 }
271
272 let lag = newest.wrapping_sub(self.last_seq) as usize;
273 if lag > N {
274 let next = self.last_seq.wrapping_add(1);
275 let keep_from = newest.wrapping_sub((N - 1) as u32);
276 let jump_drops = keep_from.wrapping_sub(next) as usize;
277 dropped += jump_drops;
278 self.last_seq = keep_from.wrapping_sub(1);
279 continue;
280 }
281
282 let next = self.last_seq.wrapping_add(1);
283
284 match self.ring.read_seq_inner(next) {
285 Some(v) => {
286 hook(next, &v);
287 self.last_seq = next;
288 read += 1;
289 }
290 None => {
291 self.last_seq = next;
292 dropped += 1;
293 }
294 }
295 }
296
297 self.dropped_accum += dropped;
298
299 PollStats {
300 read,
301 dropped,
302 newest,
303 }
304 }
305
306 #[inline]
311 pub fn latest(&self, hook: impl FnOnce(u32, &T)) -> bool {
312 let newest = self.ring.newest_seq();
313 if newest == 0 {
314 return false;
315 }
316 if let Some(v) = self.ring.read_seq_inner(newest) {
317 hook(newest, &v);
318 true
319 } else {
320 false
321 }
322 }
323
324 #[inline]
329 pub fn skip_to_latest(&mut self) {
330 let newest = self.ring.newest_seq();
331 if newest != 0 {
332 self.last_seq = newest.wrapping_sub(1);
333 }
334 }
335}
336
337impl<'a, T: Copy, const N: usize> Drop for Consumer<'a, T, N> {
338 fn drop(&mut self) {
339 self.ring.consumer_taken.store(false, Ordering::Release);
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::{SeqRing, TEST_AFTER_READ_SEQ, TEST_AFTER_READ_TARGET};
346 use core::sync::atomic::Ordering;
347 use std::vec::Vec;
348
349 #[test]
350 fn poll_one_empty_returns_false() {
351 let ring = SeqRing::<u32, 4>::new();
352 let mut consumer = ring.consumer();
353 let ok = consumer.poll_one(|_, _| {});
354 assert!(!ok);
355 }
356
357 #[test]
358 fn polls_in_order() {
359 let ring = SeqRing::<u32, 8>::new();
360 let producer = ring.producer();
361 let mut consumer = ring.consumer();
362
363 producer.push(10);
364 producer.push(11);
365 producer.push(12);
366
367 let mut seen = Vec::new();
368 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
369
370 assert_eq!(stats.read, 3);
371 assert_eq!(stats.dropped, 0);
372 assert_eq!(stats.newest, 3);
373 assert_eq!(&seen[..], &[(1, 10), (2, 11), (3, 12)]);
374 }
375
376 #[test]
377 fn drops_when_consumer_lags() {
378 let ring = SeqRing::<u32, 4>::new();
379 let producer = ring.producer();
380 let mut consumer = ring.consumer();
381
382 for i in 0..10 {
383 producer.push(i);
384 }
385
386 let mut seen = Vec::new();
387 let stats = consumer.poll_up_to(10, |seq, v| seen.push((seq, *v)));
388
389 assert_eq!(stats.read, 4);
390 assert_eq!(stats.dropped, 6);
391 assert_eq!(stats.newest, 10);
392 assert_eq!(&seen[..], &[(7, 6), (8, 7), (9, 8), (10, 9)]);
393 }
394
395 #[test]
396 fn latest_reads_newest() {
397 let ring = SeqRing::<u32, 8>::new();
398 let producer = ring.producer();
399 let consumer = ring.consumer();
400
401 producer.push(1);
402 producer.push(2);
403
404 let mut got = None;
405 let ok = consumer.latest(|seq, v| got = Some((seq, *v)));
406
407 assert!(ok);
408 assert_eq!(got, Some((2, 2)));
409 }
410
411 #[test]
412 fn skip_to_latest_makes_next_poll_latest() {
413 let ring = SeqRing::<u32, 8>::new();
414 let producer = ring.producer();
415 let mut consumer = ring.consumer();
416
417 producer.push(10);
418 producer.push(11);
419 producer.push(12);
420
421 consumer.skip_to_latest();
422
423 let mut got = None;
424 let ok = consumer.poll_one(|seq, v| got = Some((seq, *v)));
425
426 assert!(ok);
427 assert_eq!(got, Some((3, 12)));
428 }
429
430 #[test]
431 fn poll_up_to_zero_returns_newest_only() {
432 let ring = SeqRing::<u32, 4>::new();
433 let producer = ring.producer();
434 let mut consumer = ring.consumer();
435
436 producer.push(42);
437
438 let stats = consumer.poll_up_to(0, |_, _| panic!("hook should not run"));
439
440 assert_eq!(stats.read, 0);
441 assert_eq!(stats.dropped, 0);
442 assert_eq!(stats.newest, 1);
443 }
444
445 #[test]
446 fn dropped_counter_can_reset() {
447 let ring = SeqRing::<u32, 2>::new();
448 let producer = ring.producer();
449 let mut consumer = ring.consumer();
450
451 for i in 0..5 {
452 producer.push(i);
453 }
454
455 let stats = consumer.poll_up_to(10, |_, _| {});
456
457 assert_eq!(consumer.dropped(), stats.dropped);
458
459 consumer.reset_dropped();
460
461 assert_eq!(consumer.dropped(), 0);
462 }
463
464 #[test]
465 fn latest_empty_returns_false() {
466 let ring = SeqRing::<u32, 4>::new();
467 let consumer = ring.consumer();
468
469 let ok = consumer.latest(|_, _| {});
470
471 assert!(!ok);
472 }
473
474 #[test]
475 fn latest_returns_false_when_slot_missing() {
476 let ring = SeqRing::<u32, 4>::new();
477 let consumer = ring.consumer();
478
479 ring.published_seq.store(1, Ordering::Release);
480
481 let ok = consumer.latest(|_, _| {});
482
483 assert!(!ok);
484 }
485
486 #[test]
487 fn poll_up_to_counts_dropped_when_slot_missing() {
488 let ring = SeqRing::<u32, 4>::new();
489 let mut consumer = ring.consumer();
490
491 ring.published_seq.store(1, Ordering::Release);
492
493 let stats = consumer.poll_up_to(1, |_, _| panic!("hook should not run"));
494
495 assert_eq!(stats.read, 0);
496 assert_eq!(stats.dropped, 1);
497 assert_eq!(consumer.dropped(), 1);
498 }
499
500 #[test]
501 fn read_seq_inner_detects_overwrite_during_read() {
502 let ring = SeqRing::<u32, 4>::new();
503 let producer = ring.producer();
504 let seq = producer.push(7);
505
506 TEST_AFTER_READ_SEQ.store(seq.wrapping_add(1), Ordering::Relaxed);
507 TEST_AFTER_READ_TARGET.store(&ring as *const _ as usize, Ordering::Release);
508
509 let got = ring.read_seq_inner(seq);
510
511 TEST_AFTER_READ_TARGET.store(0, Ordering::Release);
512
513 assert!(got.is_none());
514 }
515
516 #[test]
517 fn push_wraps_seq_from_zero_to_one() {
518 let ring = SeqRing::<u32, 4>::new();
519
520 ring.next_seq.store(u32::MAX, Ordering::Relaxed);
521
522 let seq = ring.producer().push(1);
523
524 assert_eq!(seq, 1);
525 assert_eq!(ring.next_seq.load(Ordering::Relaxed), 1);
526 }
527}