1use super::QueueStats;
7use crate::buffer::AtomicCell;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10#[repr(C, align(64))]
15pub struct RingQueue<T: Copy, const CAP: usize> {
16 data: [AtomicCell<T>; CAP],
18 write_pos: AtomicUsize,
20 mask: usize,
22 stats: QueueStats,
24}
25
26impl<T: Copy + Default, const CAP: usize> Default for RingQueue<T, CAP> {
27 fn default() -> Self { Self::new() }
28}
29
30impl<T: Copy + Default, const CAP: usize> RingQueue<T, CAP> {
31 pub fn new() -> Self {
33 assert!(CAP.is_power_of_two(), "CAP must be a power of two");
34
35 let data = std::array::from_fn(|_| AtomicCell::new(T::default()));
36
37 Self {
38 data,
39 write_pos: AtomicUsize::new(0),
40 mask: CAP - 1,
41 stats: QueueStats::new(),
42 }
43 }
44
45 pub fn push(&self, value: T) {
47 let pos = self.write_pos.load(Ordering::Relaxed);
48 self.data[pos].store(value);
49 self.write_pos
50 .store((pos + 1) & self.mask, Ordering::Release);
51 self.stats.record_push(self.len());
52 }
53
54 pub fn read_delayed(&self, delay: usize) -> T {
59 assert!(delay < CAP, "Delay must be less than CAP");
60
61 let write_pos = self.write_pos.load(Ordering::Acquire);
62 let read_pos = (write_pos + CAP - delay - 1) & self.mask;
63
64 self.data[read_pos].load()
65 }
66
67 pub fn read_interpolated(&self, delay_frac: f64) -> T
69 where
70 T: From<f64> + Into<f64>,
71 {
72 let delay_int = delay_frac.floor() as usize;
73 let frac = delay_frac.fract();
74
75 let s1: f64 = self.read_delayed(delay_int).into();
76 let s2: f64 = self.read_delayed(delay_int + 1).into();
77
78 T::from(s1 * (1.0 - frac) + s2 * frac)
79 }
80
81 pub fn read_at(&self, index: usize) -> T {
83 let write_pos = self.write_pos.load(Ordering::Acquire);
84 let read_pos = (write_pos + CAP - index - 1) & self.mask;
85 self.data[read_pos].load()
86 }
87
88 pub fn push_slice(&self, slice: &[T]) {
90 for &value in slice {
91 self.push(value);
92 }
93 }
94
95 pub fn read_slice_delayed(&self, delay: usize, output: &mut [T]) {
97 for (i, out) in output.iter_mut().enumerate() {
98 *out = self.read_delayed(delay + i);
99 }
100 }
101
102 pub fn write_pos(&self) -> usize {
104 self.write_pos.load(Ordering::Acquire)
105 }
106
107 pub const fn capacity(&self) -> usize {
109 CAP
110 }
111
112 pub fn len(&self) -> usize {
114 CAP
115 }
116
117 pub fn is_empty(&self) -> bool {
119 self.len() == 0
120 }
121
122 pub fn reset(&self) {
124 self.write_pos.store(0, Ordering::Release);
125 }
126
127 pub fn stats(&self) -> &QueueStats {
129 &self.stats
130 }
131}
132
133#[allow(unsafe_code)]
134unsafe impl<T: Copy + Send, const CAP: usize> Send for RingQueue<T, CAP> {}
135#[allow(unsafe_code)]
136unsafe impl<T: Copy + Sync, const CAP: usize> Sync for RingQueue<T, CAP> {}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 #[test]
143 fn test_ring_queue_basic() {
144 let queue = RingQueue::<i32, 4>::new();
145
146 queue.push(1);
147 queue.push(2);
148 queue.push(3);
149 queue.push(4);
150
151 assert_eq!(queue.read_delayed(0), 4);
152 assert_eq!(queue.read_delayed(1), 3);
153 assert_eq!(queue.read_delayed(2), 2);
154 assert_eq!(queue.read_delayed(3), 1);
155 }
156
157 #[test]
158 fn test_ring_queue_wraparound() {
159 let queue = RingQueue::<i32, 4>::new();
160
161 for i in 0..10 {
162 queue.push(i);
163 }
164
165 assert_eq!(queue.read_delayed(0), 9);
167 assert_eq!(queue.read_delayed(1), 8);
168 assert_eq!(queue.read_delayed(2), 7);
169 assert_eq!(queue.read_delayed(3), 6);
170 }
171
172 #[test]
173 fn test_ring_queue_interpolated() {
174 let queue = RingQueue::<f64, 4>::new();
175
176 queue.push(1.0);
177 queue.push(2.0);
178 queue.push(3.0);
179 queue.push(4.0);
180
181 let val = queue.read_interpolated(1.5);
182 assert!((val - 2.5).abs() < 0.001);
183 }
184}