quill_sql/utils/
ring_buffer.rs

1#[derive(Debug)]
2pub struct RingBuffer<T> {
3    buf: Vec<Option<T>>,
4    head: usize,
5    len: usize,
6}
7
8impl<T> RingBuffer<T> {
9    pub fn with_capacity(cap: usize) -> Self {
10        assert!(cap > 0);
11        let mut buf = Vec::with_capacity(cap);
12        buf.resize_with(cap, || None);
13        Self {
14            buf,
15            head: 0,
16            len: 0,
17        }
18    }
19
20    pub fn len(&self) -> usize {
21        self.len
22    }
23    pub fn is_empty(&self) -> bool {
24        self.len == 0
25    }
26    pub fn capacity(&self) -> usize {
27        self.buf.len()
28    }
29
30    pub fn push(&mut self, item: T) {
31        let cap = self.buf.len();
32        if self.len < cap {
33            let tail = (self.head + self.len) % cap;
34            self.buf[tail] = Some(item);
35            self.len += 1;
36        } else {
37            self.buf[self.head] = Some(item);
38            self.head = (self.head + 1) % cap;
39        }
40    }
41
42    pub fn pop(&mut self) -> Option<T> {
43        if self.len == 0 {
44            return None;
45        }
46        let idx = self.head;
47        let item = self.buf[idx].take();
48        self.head = (self.head + 1) % self.buf.len();
49        self.len -= 1;
50        item
51    }
52}
53
54use std::cell::UnsafeCell;
55use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
56
57const SLOT_EMPTY: u8 = 0;
58const SLOT_READY: u8 = 1;
59
60#[derive(Debug)]
61struct Slot<T> {
62    value: UnsafeCell<Option<T>>,
63    state: AtomicU8,
64}
65
66impl<T> Slot<T> {
67    fn new() -> Self {
68        Self {
69            value: UnsafeCell::new(None),
70            state: AtomicU8::new(SLOT_EMPTY),
71        }
72    }
73}
74
75unsafe impl<T: Send> Send for Slot<T> {}
76unsafe impl<T: Send> Sync for Slot<T> {}
77
78#[derive(Debug)]
79pub struct ConcurrentRingBuffer<T> {
80    slots: Vec<Slot<T>>,
81    capacity: usize,
82    head: AtomicU64,
83    tail: AtomicU64,
84}
85
86unsafe impl<T: Send> Send for ConcurrentRingBuffer<T> {}
87unsafe impl<T: Send> Sync for ConcurrentRingBuffer<T> {}
88
89impl<T> ConcurrentRingBuffer<T> {
90    pub fn with_capacity(cap: usize) -> Self {
91        assert!(cap > 0, "concurrent ring buffer needs capacity > 0");
92        let mut slots = Vec::with_capacity(cap);
93        for _ in 0..cap {
94            slots.push(Slot::new());
95        }
96        Self {
97            slots,
98            capacity: cap,
99            head: AtomicU64::new(0),
100            tail: AtomicU64::new(0),
101        }
102    }
103
104    #[inline]
105    fn index(&self, cursor: u64) -> usize {
106        (cursor as usize) % self.capacity
107    }
108
109    #[inline]
110    pub fn len(&self) -> usize {
111        let head = self.head.load(Ordering::Acquire);
112        let tail = self.tail.load(Ordering::Acquire);
113        tail.saturating_sub(head) as usize
114    }
115
116    #[inline]
117    pub fn is_empty(&self) -> bool {
118        self.len() == 0
119    }
120
121    pub fn try_push(&self, value: T) -> Result<(), T> {
122        loop {
123            let head = self.head.load(Ordering::Acquire);
124            let tail = self.tail.load(Ordering::Relaxed);
125            if tail.wrapping_sub(head) as usize >= self.capacity {
126                return Err(value);
127            }
128            if self
129                .tail
130                .compare_exchange(tail, tail + 1, Ordering::AcqRel, Ordering::Relaxed)
131                .is_ok()
132            {
133                let slot = &self.slots[self.index(tail)];
134                while slot.state.load(Ordering::Acquire) != SLOT_EMPTY {
135                    std::hint::spin_loop();
136                }
137                unsafe {
138                    *slot.value.get() = Some(value);
139                }
140                slot.state.store(SLOT_READY, Ordering::Release);
141                return Ok(());
142            }
143            // CAS failed due to race – retry with same value
144            std::hint::spin_loop();
145        }
146    }
147
148    pub fn pop(&self) -> Option<T> {
149        loop {
150            let head = self.head.load(Ordering::Relaxed);
151            let tail = self.tail.load(Ordering::Acquire);
152            if head >= tail {
153                return None;
154            }
155            if self
156                .head
157                .compare_exchange(head, head + 1, Ordering::AcqRel, Ordering::Relaxed)
158                .is_ok()
159            {
160                let slot = &self.slots[self.index(head)];
161                while slot.state.load(Ordering::Acquire) != SLOT_READY {
162                    std::hint::spin_loop();
163                }
164                let item = unsafe { (*slot.value.get()).take() };
165                slot.state.store(SLOT_EMPTY, Ordering::Release);
166                return item;
167            }
168            std::hint::spin_loop();
169        }
170    }
171
172    pub fn peek_clone(&self) -> Option<T>
173    where
174        T: Clone,
175    {
176        let head = self.head.load(Ordering::Acquire);
177        let tail = self.tail.load(Ordering::Acquire);
178        if head >= tail {
179            return None;
180        }
181        let slot = &self.slots[self.index(head)];
182        if slot.state.load(Ordering::Acquire) != SLOT_READY {
183            return None;
184        }
185        unsafe { (*slot.value.get()).as_ref().cloned() }
186    }
187
188    pub fn snapshot(&self) -> Vec<T>
189    where
190        T: Clone,
191    {
192        let mut items = Vec::new();
193        let head = self.head.load(Ordering::Acquire);
194        let tail = self.tail.load(Ordering::Acquire);
195        for cursor in head..tail {
196            let slot = &self.slots[self.index(cursor)];
197            if slot.state.load(Ordering::Acquire) == SLOT_READY {
198                if let Some(value) = unsafe { (*slot.value.get()).as_ref() } {
199                    items.push(value.clone());
200                }
201            }
202        }
203        items
204    }
205}
206
207impl<T> Drop for ConcurrentRingBuffer<T> {
208    fn drop(&mut self) {
209        for slot in &self.slots {
210            unsafe {
211                let _ = (*slot.value.get()).take();
212            }
213        }
214    }
215}