Skip to main content

rill_core/queues/
spsc.rs

1use std::fmt;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3
4use super::{OverflowPolicy, QueueError, QueueResult, QueueStatsSnapshot, RtQueueBase};
5use crate::buffer::AtomicCell;
6
7// =============================================================================
8// Основная структура
9// =============================================================================
10
11#[repr(C, align(64))]
12pub struct SpscQueue<T: Copy, const CAP: usize> {
13    buffer: [AtomicCell<T>; CAP],
14    head: AtomicUsize,
15    tail: AtomicUsize,
16    full: AtomicBool,
17    mask: usize,
18    overflow_policy: OverflowPolicy,
19    default_value: Option<T>,
20}
21
22impl<T: Copy + Default, const CAP: usize> SpscQueue<T, CAP> {
23    /// Создать новую очередь
24    pub fn new() -> Self {
25        assert!(CAP.is_power_of_two(), "CAP must be a power of two");
26
27        let buffer = std::array::from_fn(|_| AtomicCell::new(T::default()));
28
29        Self {
30            buffer,
31            head: AtomicUsize::new(0),
32            tail: AtomicUsize::new(0),
33            full: AtomicBool::new(false),
34            mask: CAP - 1,
35            overflow_policy: OverflowPolicy::OverwriteOldest,
36            default_value: None,
37        }
38    }
39
40    /// Создать очередь с указанными политиками
41    pub fn with_policies(overflow_policy: OverflowPolicy, default_value: Option<T>) -> Self {
42        let mut queue = Self::new();
43        queue.overflow_policy = overflow_policy;
44        queue.default_value = default_value;
45        queue
46    }
47
48    /// Добавить элемент
49    pub fn push(&self, value: T) -> QueueResult<()> {
50        let head = self.head.load(Ordering::Relaxed);
51        let next_head = (head + 1) & self.mask;
52
53        // Проверка на переполнение
54        if self.full.load(Ordering::Acquire) {
55            match self.overflow_policy {
56                OverflowPolicy::OverwriteOldest => {
57                    // Перезаписываем самый старый элемент
58                    // Сдвигаем tail, чтобы освободить место
59                    let _ = self.tail.fetch_add(1, Ordering::Release) & self.mask;
60                    self.full.store(false, Ordering::Release);
61                }
62
63                OverflowPolicy::DropNewest => {
64                    return Err(QueueError::QueueFull);
65                }
66
67                OverflowPolicy::Panic => {
68                    panic!("SpscQueue overflow (capacity: {})", CAP);
69                }
70
71                OverflowPolicy::Block => {
72                    return Err(QueueError::QueueFull);
73                }
74            }
75        }
76
77        self.buffer[head].store(value);
78
79        // Обновляем head
80        self.head.store(next_head, Ordering::Release);
81
82        // Если после записи head догоняет tail, значит очередь полна
83        if next_head == self.tail.load(Ordering::Acquire) {
84            self.full.store(true, Ordering::Release);
85        }
86
87        Ok(())
88    }
89
90    /// Извлечь элемент
91    pub fn pop(&self) -> Option<T> {
92        if self.is_empty() {
93            return self.default_value;
94        }
95
96        let tail = self.tail.load(Ordering::Relaxed);
97        let value = self.buffer[tail].load();
98
99        let next_tail = (tail + 1) & self.mask;
100        self.tail.store(next_tail, Ordering::Release);
101
102        // После извлечения очередь уже не полна
103        self.full.store(false, Ordering::Release);
104
105        Some(value)
106    }
107
108    /// Получить элемент без удаления
109    pub fn peek(&self) -> Option<T> {
110        if self.is_empty() {
111            None
112        } else {
113            let tail = self.tail.load(Ordering::Acquire);
114            Some(self.buffer[tail].load())
115        }
116    }
117
118    /// Текущий размер
119    pub fn len(&self) -> usize {
120        if self.full.load(Ordering::Acquire) {
121            CAP
122        } else {
123            let head = self.head.load(Ordering::Acquire);
124            let tail = self.tail.load(Ordering::Acquire);
125
126            if head >= tail {
127                head - tail
128            } else {
129                CAP - tail + head
130            }
131        }
132    }
133
134    /// Вместимость
135    pub const fn capacity(&self) -> usize {
136        CAP
137    }
138
139    /// Проверить, пуста ли очередь
140    pub fn is_empty(&self) -> bool {
141        !self.full.load(Ordering::Acquire)
142            && self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
143    }
144
145    /// Проверить, полна ли очередь
146    pub fn is_full(&self) -> bool {
147        self.full.load(Ordering::Acquire)
148    }
149
150    /// Очистить очередь
151    pub fn clear(&self) {
152        self.head.store(0, Ordering::Relaxed);
153        self.tail.store(0, Ordering::Relaxed);
154        self.full.store(false, Ordering::Relaxed);
155    }
156
157    /// Получить статистику
158    pub fn stats(&self) -> QueueStatsSnapshot {
159        QueueStatsSnapshot::default()
160    }
161
162    /// Установить значение по умолчанию
163    pub fn set_default(&mut self, value: T) {
164        self.default_value = Some(value);
165    }
166
167    /// Получить политику переполнения
168    pub fn overflow_policy(&self) -> OverflowPolicy {
169        self.overflow_policy
170    }
171
172    /// Установить политику переполнения
173    pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
174        self.overflow_policy = policy;
175    }
176}
177
178impl<T: Copy + Default + Send + Sync, const CAP: usize> RtQueueBase<T> for SpscQueue<T, CAP> {
179    fn push(&self, value: T) -> QueueResult<()> {
180        self.push(value)
181    }
182
183    fn pop(&self) -> Option<T> {
184        self.pop()
185    }
186
187    fn len(&self) -> usize {
188        self.len()
189    }
190
191    fn capacity(&self) -> usize {
192        CAP
193    }
194
195    fn clear(&self) {
196        self.clear();
197    }
198}
199
200impl<T: Copy + Default + fmt::Debug, const CAP: usize> fmt::Debug for SpscQueue<T, CAP> {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        f.debug_struct("SpscQueue")
203            .field("head", &self.head.load(Ordering::Relaxed))
204            .field("tail", &self.tail.load(Ordering::Relaxed))
205            .field("capacity", &CAP)
206            .field("len", &self.len())
207            .field("overflow_policy", &self.overflow_policy)
208            .field("default_value", &self.default_value)
209            .finish()
210    }
211}
212
213impl<T: Copy + Default, const CAP: usize> Default for SpscQueue<T, CAP> {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218#[allow(unsafe_code)]
219unsafe impl<T: Copy + Send, const CAP: usize> Send for SpscQueue<T, CAP> {}
220#[allow(unsafe_code)]
221unsafe impl<T: Copy + Sync, const CAP: usize> Sync for SpscQueue<T, CAP> {}
222
223// =============================================================================
224// Тесты
225// =============================================================================
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn test_spsc_basic() {
233        let queue = SpscQueue::<i32, 4>::new();
234
235        assert!(queue.is_empty());
236        assert_eq!(queue.capacity(), 4);
237        assert_eq!(queue.len(), 0);
238
239        queue.push(1).unwrap();
240        assert_eq!(queue.len(), 1);
241        assert!(!queue.is_empty());
242        assert!(!queue.is_full()); // Не полон после 1 элемента
243
244        queue.push(2).unwrap();
245        queue.push(3).unwrap();
246        queue.push(4).unwrap();
247
248        assert!(queue.is_full()); // Полон после 4 элементов
249        assert_eq!(queue.len(), 4);
250
251        assert_eq!(queue.pop(), Some(1));
252        assert_eq!(queue.pop(), Some(2));
253        assert_eq!(queue.pop(), Some(3));
254        assert_eq!(queue.pop(), Some(4));
255        assert_eq!(queue.pop(), None);
256        assert!(queue.is_empty());
257    }
258
259    #[test]
260    fn test_spsc_overwrite_policy() {
261        let queue = SpscQueue::<i32, 2>::new(); // политика по умолчанию OverwriteOldest
262
263        queue.push(1).unwrap();
264        queue.push(2).unwrap();
265        assert!(queue.is_full());
266
267        // Перезаписываем самый старый (1)
268        queue.push(3).unwrap();
269        assert_eq!(queue.len(), 2);
270
271        // Теперь в очереди [2, 3] (2 стал самым старым)
272        assert_eq!(queue.pop(), Some(2));
273        assert_eq!(queue.pop(), Some(3));
274        assert_eq!(queue.pop(), None);
275    }
276
277    #[test]
278    fn test_spsc_drop_newest_policy() {
279        let queue = SpscQueue::<i32, 2>::with_policies(OverflowPolicy::DropNewest, None);
280
281        queue.push(1).unwrap();
282        queue.push(2).unwrap();
283        assert!(queue.is_full());
284
285        // Должно вернуть ошибку, элемент не добавляется
286        assert!(queue.push(3).is_err());
287
288        // Очередь должна содержать [1, 2] в том же порядке
289        assert_eq!(queue.pop(), Some(1));
290        assert_eq!(queue.pop(), Some(2));
291        assert_eq!(queue.pop(), None);
292    }
293
294    #[test]
295    fn test_spsc_wraparound() {
296        let queue = SpscQueue::<i32, 4>::new();
297
298        // Заполняем
299        queue.push(0).unwrap();
300        queue.push(1).unwrap();
301        queue.push(2).unwrap();
302        queue.push(3).unwrap();
303        assert!(queue.is_full());
304
305        // Извлекаем два
306        assert_eq!(queue.pop(), Some(0));
307        assert_eq!(queue.pop(), Some(1));
308
309        // Добавляем два новых
310        queue.push(4).unwrap();
311        queue.push(5).unwrap();
312        assert!(queue.is_full());
313
314        // Проверяем порядок
315        assert_eq!(queue.pop(), Some(2));
316        assert_eq!(queue.pop(), Some(3));
317        assert_eq!(queue.pop(), Some(4));
318        assert_eq!(queue.pop(), Some(5));
319        assert_eq!(queue.pop(), None);
320    }
321
322    #[test]
323    fn test_spsc_peek() {
324        let queue = SpscQueue::<i32, 4>::new();
325
326        assert_eq!(queue.peek(), None);
327
328        queue.push(42).unwrap();
329        assert_eq!(queue.peek(), Some(42));
330        assert_eq!(queue.len(), 1);
331        assert_eq!(queue.pop(), Some(42));
332        assert_eq!(queue.peek(), None);
333    }
334
335    #[test]
336    fn test_spsc_clear() {
337        let queue = SpscQueue::<i32, 4>::new();
338
339        queue.push(1).unwrap();
340        queue.push(2).unwrap();
341        queue.push(3).unwrap();
342
343        assert_eq!(queue.len(), 3);
344
345        queue.clear();
346        assert_eq!(queue.len(), 0);
347        assert!(queue.is_empty());
348    }
349
350    #[test]
351    fn test_spsc_default_value() {
352        let queue = SpscQueue::<i32, 4>::with_policies(OverflowPolicy::OverwriteOldest, Some(-1));
353
354        assert_eq!(queue.pop(), Some(-1));
355
356        queue.push(42).unwrap();
357        assert_eq!(queue.pop(), Some(42));
358        assert_eq!(queue.pop(), Some(-1));
359    }
360
361    #[test]
362    fn test_spsc_policy_change() {
363        let mut queue = SpscQueue::<i32, 2>::new();
364        assert_eq!(queue.overflow_policy(), OverflowPolicy::OverwriteOldest);
365
366        queue.set_overflow_policy(OverflowPolicy::DropNewest);
367        assert_eq!(queue.overflow_policy(), OverflowPolicy::DropNewest);
368    }
369
370    #[test]
371    #[should_panic(expected = "CAP must be a power of two")]
372    fn test_spsc_invalid_capacity() {
373        let _ = SpscQueue::<i32, 3>::new();
374    }
375}