1use super::QueueStats;
7use crate::buffer::AtomicCell;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10#[repr(C, align(64))]
15pub struct RingQueue<T: Copy, const CAP: usize> {
16 data: [AtomicCell<T>; CAP],
18 write_pos: AtomicUsize,
20 mask: usize,
22 stats: QueueStats,
24}
25
26impl<T: Copy + Default, const CAP: usize> Default for RingQueue<T, CAP> {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl<T: Copy + Default, const CAP: usize> RingQueue<T, CAP> {
33 pub fn new() -> Self {
35 assert!(CAP.is_power_of_two(), "CAP must be a power of two");
36
37 let data = std::array::from_fn(|_| AtomicCell::new(T::default()));
38
39 Self {
40 data,
41 write_pos: AtomicUsize::new(0),
42 mask: CAP - 1,
43 stats: QueueStats::new(),
44 }
45 }
46
47 pub fn push(&self, value: T) {
49 let pos = self.write_pos.load(Ordering::Relaxed);
50 self.data[pos].store(value);
51 self.write_pos
52 .store((pos + 1) & self.mask, Ordering::Release);
53 self.stats.record_push(self.len());
54 }
55
56 pub fn read_delayed(&self, delay: usize) -> T {
61 assert!(delay < CAP, "Delay must be less than CAP");
62
63 let write_pos = self.write_pos.load(Ordering::Acquire);
64 let read_pos = (write_pos + CAP - delay - 1) & self.mask;
65
66 self.data[read_pos].load()
67 }
68
69 pub fn read_interpolated(&self, delay_frac: f64) -> T
71 where
72 T: From<f64> + Into<f64>,
73 {
74 let delay_int = delay_frac.floor() as usize;
75 let frac = delay_frac.fract();
76
77 let s1: f64 = self.read_delayed(delay_int).into();
78 let s2: f64 = self.read_delayed(delay_int + 1).into();
79
80 T::from(s1 * (1.0 - frac) + s2 * frac)
81 }
82
83 pub fn read_at(&self, index: usize) -> T {
85 let write_pos = self.write_pos.load(Ordering::Acquire);
86 let read_pos = (write_pos + CAP - index - 1) & self.mask;
87 self.data[read_pos].load()
88 }
89
90 pub fn push_slice(&self, slice: &[T]) {
92 for &value in slice {
93 self.push(value);
94 }
95 }
96
97 pub fn read_slice_delayed(&self, delay: usize, output: &mut [T]) {
99 for (i, out) in output.iter_mut().enumerate() {
100 *out = self.read_delayed(delay + i);
101 }
102 }
103
104 pub fn write_pos(&self) -> usize {
106 self.write_pos.load(Ordering::Acquire)
107 }
108
109 pub const fn capacity(&self) -> usize {
111 CAP
112 }
113
114 pub fn len(&self) -> usize {
116 CAP
117 }
118
119 pub fn is_empty(&self) -> bool {
121 self.len() == 0
122 }
123
124 pub fn reset(&self) {
126 self.write_pos.store(0, Ordering::Release);
127 }
128
129 pub fn stats(&self) -> &QueueStats {
131 &self.stats
132 }
133}
134
135#[allow(unsafe_code)]
136unsafe impl<T: Copy + Send, const CAP: usize> Send for RingQueue<T, CAP> {}
137#[allow(unsafe_code)]
138unsafe impl<T: Copy + Sync, const CAP: usize> Sync for RingQueue<T, CAP> {}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn test_ring_queue_basic() {
146 let queue = RingQueue::<i32, 4>::new();
147
148 queue.push(1);
149 queue.push(2);
150 queue.push(3);
151 queue.push(4);
152
153 assert_eq!(queue.read_delayed(0), 4);
154 assert_eq!(queue.read_delayed(1), 3);
155 assert_eq!(queue.read_delayed(2), 2);
156 assert_eq!(queue.read_delayed(3), 1);
157 }
158
159 #[test]
160 fn test_ring_queue_wraparound() {
161 let queue = RingQueue::<i32, 4>::new();
162
163 for i in 0..10 {
164 queue.push(i);
165 }
166
167 assert_eq!(queue.read_delayed(0), 9);
169 assert_eq!(queue.read_delayed(1), 8);
170 assert_eq!(queue.read_delayed(2), 7);
171 assert_eq!(queue.read_delayed(3), 6);
172 }
173
174 #[test]
175 fn test_ring_queue_interpolated() {
176 let queue = RingQueue::<f64, 4>::new();
177
178 queue.push(1.0);
179 queue.push(2.0);
180 queue.push(3.0);
181 queue.push(4.0);
182
183 let val = queue.read_interpolated(1.5);
184 assert!((val - 2.5).abs() < 0.001);
185 }
186}