quill_sql/utils/
ring_buffer.rs1#[derive(Debug)]
2pub struct RingBuffer<T> {
3 buf: Vec<Option<T>>,
4 head: usize,
5 len: usize,
6}
7
8impl<T> RingBuffer<T> {
9 pub fn with_capacity(cap: usize) -> Self {
10 assert!(cap > 0);
11 let mut buf = Vec::with_capacity(cap);
12 buf.resize_with(cap, || None);
13 Self {
14 buf,
15 head: 0,
16 len: 0,
17 }
18 }
19
20 pub fn len(&self) -> usize {
21 self.len
22 }
23 pub fn is_empty(&self) -> bool {
24 self.len == 0
25 }
26 pub fn capacity(&self) -> usize {
27 self.buf.len()
28 }
29
30 pub fn push(&mut self, item: T) {
31 let cap = self.buf.len();
32 if self.len < cap {
33 let tail = (self.head + self.len) % cap;
34 self.buf[tail] = Some(item);
35 self.len += 1;
36 } else {
37 self.buf[self.head] = Some(item);
38 self.head = (self.head + 1) % cap;
39 }
40 }
41
42 pub fn pop(&mut self) -> Option<T> {
43 if self.len == 0 {
44 return None;
45 }
46 let idx = self.head;
47 let item = self.buf[idx].take();
48 self.head = (self.head + 1) % self.buf.len();
49 self.len -= 1;
50 item
51 }
52}
53
54use std::cell::UnsafeCell;
55use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
56
57const SLOT_EMPTY: u8 = 0;
58const SLOT_READY: u8 = 1;
59
60#[derive(Debug)]
61struct Slot<T> {
62 value: UnsafeCell<Option<T>>,
63 state: AtomicU8,
64}
65
66impl<T> Slot<T> {
67 fn new() -> Self {
68 Self {
69 value: UnsafeCell::new(None),
70 state: AtomicU8::new(SLOT_EMPTY),
71 }
72 }
73}
74
75unsafe impl<T: Send> Send for Slot<T> {}
76unsafe impl<T: Send> Sync for Slot<T> {}
77
78#[derive(Debug)]
79pub struct ConcurrentRingBuffer<T> {
80 slots: Vec<Slot<T>>,
81 capacity: usize,
82 head: AtomicU64,
83 tail: AtomicU64,
84}
85
86unsafe impl<T: Send> Send for ConcurrentRingBuffer<T> {}
87unsafe impl<T: Send> Sync for ConcurrentRingBuffer<T> {}
88
89impl<T> ConcurrentRingBuffer<T> {
90 pub fn with_capacity(cap: usize) -> Self {
91 assert!(cap > 0, "concurrent ring buffer needs capacity > 0");
92 let mut slots = Vec::with_capacity(cap);
93 for _ in 0..cap {
94 slots.push(Slot::new());
95 }
96 Self {
97 slots,
98 capacity: cap,
99 head: AtomicU64::new(0),
100 tail: AtomicU64::new(0),
101 }
102 }
103
104 #[inline]
105 fn index(&self, cursor: u64) -> usize {
106 (cursor as usize) % self.capacity
107 }
108
109 #[inline]
110 pub fn len(&self) -> usize {
111 let head = self.head.load(Ordering::Acquire);
112 let tail = self.tail.load(Ordering::Acquire);
113 tail.saturating_sub(head) as usize
114 }
115
116 #[inline]
117 pub fn is_empty(&self) -> bool {
118 self.len() == 0
119 }
120
121 pub fn try_push(&self, value: T) -> Result<(), T> {
122 loop {
123 let head = self.head.load(Ordering::Acquire);
124 let tail = self.tail.load(Ordering::Relaxed);
125 if tail.wrapping_sub(head) as usize >= self.capacity {
126 return Err(value);
127 }
128 if self
129 .tail
130 .compare_exchange(tail, tail + 1, Ordering::AcqRel, Ordering::Relaxed)
131 .is_ok()
132 {
133 let slot = &self.slots[self.index(tail)];
134 while slot.state.load(Ordering::Acquire) != SLOT_EMPTY {
135 std::hint::spin_loop();
136 }
137 unsafe {
138 *slot.value.get() = Some(value);
139 }
140 slot.state.store(SLOT_READY, Ordering::Release);
141 return Ok(());
142 }
143 std::hint::spin_loop();
145 }
146 }
147
148 pub fn pop(&self) -> Option<T> {
149 loop {
150 let head = self.head.load(Ordering::Relaxed);
151 let tail = self.tail.load(Ordering::Acquire);
152 if head >= tail {
153 return None;
154 }
155 if self
156 .head
157 .compare_exchange(head, head + 1, Ordering::AcqRel, Ordering::Relaxed)
158 .is_ok()
159 {
160 let slot = &self.slots[self.index(head)];
161 while slot.state.load(Ordering::Acquire) != SLOT_READY {
162 std::hint::spin_loop();
163 }
164 let item = unsafe { (*slot.value.get()).take() };
165 slot.state.store(SLOT_EMPTY, Ordering::Release);
166 return item;
167 }
168 std::hint::spin_loop();
169 }
170 }
171
172 pub fn peek_clone(&self) -> Option<T>
173 where
174 T: Clone,
175 {
176 let head = self.head.load(Ordering::Acquire);
177 let tail = self.tail.load(Ordering::Acquire);
178 if head >= tail {
179 return None;
180 }
181 let slot = &self.slots[self.index(head)];
182 if slot.state.load(Ordering::Acquire) != SLOT_READY {
183 return None;
184 }
185 unsafe { (*slot.value.get()).as_ref().cloned() }
186 }
187
188 pub fn snapshot(&self) -> Vec<T>
189 where
190 T: Clone,
191 {
192 let mut items = Vec::new();
193 let head = self.head.load(Ordering::Acquire);
194 let tail = self.tail.load(Ordering::Acquire);
195 for cursor in head..tail {
196 let slot = &self.slots[self.index(cursor)];
197 if slot.state.load(Ordering::Acquire) == SLOT_READY {
198 if let Some(value) = unsafe { (*slot.value.get()).as_ref() } {
199 items.push(value.clone());
200 }
201 }
202 }
203 items
204 }
205}
206
207impl<T> Drop for ConcurrentRingBuffer<T> {
208 fn drop(&mut self) {
209 for slot in &self.slots {
210 unsafe {
211 let _ = (*slot.value.get()).take();
212 }
213 }
214 }
215}