1use super::QueueStats;
7use crate::buffer::AtomicCell;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10#[repr(C, align(64))]
15pub struct RingQueue<T: Copy, const CAP: usize> {
16 data: [AtomicCell<T>; CAP],
18 write_pos: AtomicUsize,
20 mask: usize,
22 stats: QueueStats,
24}
25
26impl<T: Copy + Default, const CAP: usize> RingQueue<T, CAP> {
27 pub fn new() -> Self {
29 assert!(CAP.is_power_of_two(), "CAP must be a power of two");
30
31 let data = std::array::from_fn(|_| AtomicCell::new(T::default()));
32
33 Self {
34 data,
35 write_pos: AtomicUsize::new(0),
36 mask: CAP - 1,
37 stats: QueueStats::new(),
38 }
39 }
40
41 pub fn push(&self, value: T) {
43 let pos = self.write_pos.load(Ordering::Relaxed);
44 self.data[pos].store(value);
45 self.write_pos
46 .store((pos + 1) & self.mask, Ordering::Release);
47 self.stats.record_push(self.len());
48 }
49
50 pub fn read_delayed(&self, delay: usize) -> T {
55 assert!(delay < CAP, "Delay must be less than CAP");
56
57 let write_pos = self.write_pos.load(Ordering::Acquire);
58 let read_pos = (write_pos + CAP - delay - 1) & self.mask;
59
60 self.data[read_pos].load()
61 }
62
63 pub fn read_interpolated(&self, delay_frac: f64) -> T
65 where
66 T: From<f64> + Into<f64>,
67 {
68 let delay_int = delay_frac.floor() as usize;
69 let frac = delay_frac.fract();
70
71 let s1: f64 = self.read_delayed(delay_int).into();
72 let s2: f64 = self.read_delayed(delay_int + 1).into();
73
74 T::from(s1 * (1.0 - frac) + s2 * frac)
75 }
76
77 pub fn read_at(&self, index: usize) -> T {
79 let write_pos = self.write_pos.load(Ordering::Acquire);
80 let read_pos = (write_pos + CAP - index - 1) & self.mask;
81 self.data[read_pos].load()
82 }
83
84 pub fn push_slice(&self, slice: &[T]) {
86 for &value in slice {
87 self.push(value);
88 }
89 }
90
91 pub fn read_slice_delayed(&self, delay: usize, output: &mut [T]) {
93 for (i, out) in output.iter_mut().enumerate() {
94 *out = self.read_delayed(delay + i);
95 }
96 }
97
98 pub fn write_pos(&self) -> usize {
100 self.write_pos.load(Ordering::Acquire)
101 }
102
103 pub const fn capacity(&self) -> usize {
105 CAP
106 }
107
108 pub fn len(&self) -> usize {
110 CAP
111 }
112
113 pub fn reset(&self) {
115 self.write_pos.store(0, Ordering::Release);
116 }
117
118 pub fn stats(&self) -> &QueueStats {
120 &self.stats
121 }
122}
123
124#[allow(unsafe_code)]
125unsafe impl<T: Copy + Send, const CAP: usize> Send for RingQueue<T, CAP> {}
126#[allow(unsafe_code)]
127unsafe impl<T: Copy + Sync, const CAP: usize> Sync for RingQueue<T, CAP> {}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn test_ring_queue_basic() {
135 let queue = RingQueue::<i32, 4>::new();
136
137 queue.push(1);
138 queue.push(2);
139 queue.push(3);
140 queue.push(4);
141
142 assert_eq!(queue.read_delayed(0), 4);
143 assert_eq!(queue.read_delayed(1), 3);
144 assert_eq!(queue.read_delayed(2), 2);
145 assert_eq!(queue.read_delayed(3), 1);
146 }
147
148 #[test]
149 fn test_ring_queue_wraparound() {
150 let queue = RingQueue::<i32, 4>::new();
151
152 for i in 0..10 {
153 queue.push(i);
154 }
155
156 assert_eq!(queue.read_delayed(0), 9);
158 assert_eq!(queue.read_delayed(1), 8);
159 assert_eq!(queue.read_delayed(2), 7);
160 assert_eq!(queue.read_delayed(3), 6);
161 }
162
163 #[test]
164 fn test_ring_queue_interpolated() {
165 let queue = RingQueue::<f64, 4>::new();
166
167 queue.push(1.0);
168 queue.push(2.0);
169 queue.push(3.0);
170 queue.push(4.0);
171
172 let val = queue.read_interpolated(1.5);
173 assert!((val - 2.5).abs() < 0.001);
174 }
175}