Skip to main content

rill_core/queues/
ring.rs

1//! # Кольцевая очередь с произвольным доступом
2//!
3//! [`RingQueue`](crate::queues::ring::RingQueue) — гибрид между кольцевым буфером и очередью,
4//! позволяющий читать данные с произвольной задержкой.
5
6use super::QueueStats;
7use crate::buffer::AtomicCell;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10/// Кольцевая очередь с произвольным доступом
11///
12/// Позволяет читать данные не только из головы, но и с произвольной
13/// задержкой. Полезно для эффектов задержки и реверберации.
14#[repr(C, align(64))]
15pub struct RingQueue<T: Copy, const CAP: usize> {
16    /// Данные
17    data: [AtomicCell<T>; CAP],
18    /// Индекс записи
19    write_pos: AtomicUsize,
20    /// Маска для быстрого вычисления
21    mask: usize,
22    /// Статистика
23    stats: QueueStats,
24}
25
26impl<T: Copy + Default, const CAP: usize> Default for RingQueue<T, CAP> {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl<T: Copy + Default, const CAP: usize> RingQueue<T, CAP> {
33    /// Создать новую кольцевую очередь
34    pub fn new() -> Self {
35        assert!(CAP.is_power_of_two(), "CAP must be a power of two");
36
37        let data = std::array::from_fn(|_| AtomicCell::new(T::default()));
38
39        Self {
40            data,
41            write_pos: AtomicUsize::new(0),
42            mask: CAP - 1,
43            stats: QueueStats::new(),
44        }
45    }
46
47    /// Записать элемент (всегда успешно)
48    pub fn push(&self, value: T) {
49        let pos = self.write_pos.load(Ordering::Relaxed);
50        self.data[pos].store(value);
51        self.write_pos
52            .store((pos + 1) & self.mask, Ordering::Release);
53        self.stats.record_push(self.len());
54    }
55
56    /// Прочитать элемент с задержкой
57    ///
58    /// # Arguments
59    /// * `delay` - задержка в семплах (0 = последний записанный)
60    pub fn read_delayed(&self, delay: usize) -> T {
61        assert!(delay < CAP, "Delay must be less than CAP");
62
63        let write_pos = self.write_pos.load(Ordering::Acquire);
64        let read_pos = (write_pos + CAP - delay - 1) & self.mask;
65
66        self.data[read_pos].load()
67    }
68
69    /// Прочитать элемент с плавающей задержкой (линейная интерполяция)
70    pub fn read_interpolated(&self, delay_frac: f64) -> T
71    where
72        T: From<f64> + Into<f64>,
73    {
74        let delay_int = delay_frac.floor() as usize;
75        let frac = delay_frac.fract();
76
77        let s1: f64 = self.read_delayed(delay_int).into();
78        let s2: f64 = self.read_delayed(delay_int + 1).into();
79
80        T::from(s1 * (1.0 - frac) + s2 * frac)
81    }
82
83    /// Прочитать элемент по абсолютному индексу
84    pub fn read_at(&self, index: usize) -> T {
85        let write_pos = self.write_pos.load(Ordering::Acquire);
86        let read_pos = (write_pos + CAP - index - 1) & self.mask;
87        self.data[read_pos].load()
88    }
89
90    /// Записать массив данных
91    pub fn push_slice(&self, slice: &[T]) {
92        for &value in slice {
93            self.push(value);
94        }
95    }
96
97    /// Прочитать срез данных с задержкой
98    pub fn read_slice_delayed(&self, delay: usize, output: &mut [T]) {
99        for (i, out) in output.iter_mut().enumerate() {
100            *out = self.read_delayed(delay + i);
101        }
102    }
103
104    /// Текущая позиция записи
105    pub fn write_pos(&self) -> usize {
106        self.write_pos.load(Ordering::Acquire)
107    }
108
109    /// Ёмкость
110    pub const fn capacity(&self) -> usize {
111        CAP
112    }
113
114    /// Количество записанных элементов (не больше CAP)
115    pub fn len(&self) -> usize {
116        CAP
117    }
118
119    /// Returns `true` if no elements have been written.
120    pub fn is_empty(&self) -> bool {
121        self.len() == 0
122    }
123
124    /// Сбросить позицию записи
125    pub fn reset(&self) {
126        self.write_pos.store(0, Ordering::Release);
127    }
128
129    /// Получить статистику
130    pub fn stats(&self) -> &QueueStats {
131        &self.stats
132    }
133}
134
135#[allow(unsafe_code)]
136unsafe impl<T: Copy + Send, const CAP: usize> Send for RingQueue<T, CAP> {}
137#[allow(unsafe_code)]
138unsafe impl<T: Copy + Sync, const CAP: usize> Sync for RingQueue<T, CAP> {}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[test]
145    fn test_ring_queue_basic() {
146        let queue = RingQueue::<i32, 4>::new();
147
148        queue.push(1);
149        queue.push(2);
150        queue.push(3);
151        queue.push(4);
152
153        assert_eq!(queue.read_delayed(0), 4);
154        assert_eq!(queue.read_delayed(1), 3);
155        assert_eq!(queue.read_delayed(2), 2);
156        assert_eq!(queue.read_delayed(3), 1);
157    }
158
159    #[test]
160    fn test_ring_queue_wraparound() {
161        let queue = RingQueue::<i32, 4>::new();
162
163        for i in 0..10 {
164            queue.push(i);
165        }
166
167        // После переполнения должны быть последние 4 значения
168        assert_eq!(queue.read_delayed(0), 9);
169        assert_eq!(queue.read_delayed(1), 8);
170        assert_eq!(queue.read_delayed(2), 7);
171        assert_eq!(queue.read_delayed(3), 6);
172    }
173
174    #[test]
175    fn test_ring_queue_interpolated() {
176        let queue = RingQueue::<f64, 4>::new();
177
178        queue.push(1.0);
179        queue.push(2.0);
180        queue.push(3.0);
181        queue.push(4.0);
182
183        let val = queue.read_interpolated(1.5);
184        assert!((val - 2.5).abs() < 0.001);
185    }
186}