1use slab::Slab;
4use spin::Mutex;
5use std::sync::Arc;
6use thiserror::Error;
7
8use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
9
10#[derive(Error, Debug)]
12pub enum CircularError {
13 #[error("Failed to allocate double mapped buffer.")]
15 Allocation(DoubleMappedBufferError),
16}
17
18pub use crate::{Metadata, NoMetadata, Notifier};
19
20pub struct Circular;
22
23impl Circular {
24 pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
28 where
29 N: Notifier,
30 M: Metadata,
31 {
32 let buffer = match DoubleMappedBuffer::new(min_items) {
33 Ok(buffer) => Arc::new(buffer),
34 Err(e) => return Err(CircularError::Allocation(e)),
35 };
36
37 let state = Arc::new(Mutex::new(State {
38 writer_offset: 0,
39 writer_ab: false,
40 writer_done: false,
41 readers: Slab::new(),
42 }));
43
44 let writer = Writer {
45 buffer,
46 state,
47 last_space: 0,
48 };
49
50 Ok(writer)
51 }
52}
53
54struct State<N, M>
55where
56 N: Notifier,
57 M: Metadata,
58{
59 writer_offset: usize,
60 writer_ab: bool,
61 writer_done: bool,
62 readers: Slab<ReaderState<N, M>>,
63}
64struct ReaderState<N, M> {
65 ab: bool,
66 offset: usize,
67 reader_notifier: N,
68 writer_notifier: N,
69 meta: M,
70}
71
72pub struct Writer<T, N, M>
74where
75 N: Notifier,
76 M: Metadata,
77{
78 last_space: usize,
79 buffer: Arc<DoubleMappedBuffer<T>>,
80 state: Arc<Mutex<State<N, M>>>,
81}
82
83impl<T, N, M> Writer<T, N, M>
84where
85 N: Notifier,
86 M: Metadata,
87{
88 #[inline(always)]
89 fn writer_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
90 if w_off > r_off {
91 r_off + capacity - w_off
92 } else if w_off < r_off {
93 r_off - w_off
94 } else if r_ab == w_ab {
95 capacity
96 } else {
97 0
98 }
99 }
100
101 pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
103 let mut state = self.state.lock();
104 let reader_state = ReaderState {
105 ab: state.writer_ab,
106 offset: state.writer_offset,
107 reader_notifier,
108 writer_notifier,
109 meta: M::new(),
110 };
111 let id = state.readers.insert(reader_state);
112
113 Reader {
114 id,
115 last_space: 0,
116 buffer: self.buffer.clone(),
117 state: self.state.clone(),
118 }
119 }
120
121 #[inline(always)]
122 fn space_and_offset_locked(
123 state: &mut State<N, M>,
124 capacity: usize,
125 arm: bool,
126 ) -> (usize, usize) {
127 let w_off = state.writer_offset;
128 let w_ab = state.writer_ab;
129
130 let mut space = capacity;
131
132 for (_, reader) in state.readers.iter_mut() {
133 let s = Self::writer_space(capacity, w_off, w_ab, reader.offset, reader.ab);
134
135 space = std::cmp::min(space, s);
136
137 if s == 0 && arm {
138 reader.writer_notifier.arm();
139 break;
140 }
141 if s == 0 {
142 break;
143 }
144 }
145
146 (space, w_off)
147 }
148
149 pub fn slice(&mut self, arm: bool) -> &mut [T] {
151 let mut state = self.state.lock();
152 let (space, offset) =
153 Self::space_and_offset_locked(&mut state, self.buffer.capacity(), arm);
154 self.last_space = space;
155 unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
156 }
157
158 pub fn produce(&mut self, n: usize, meta: &[M::Item]) {
166 if n == 0 {
167 return;
168 }
169
170 assert!(n <= self.last_space, "vmcircbuffer: produced too much");
171 self.last_space -= n;
172
173 let mut state = self.state.lock();
174 let capacity = self.buffer.capacity();
175
176 debug_assert!(Self::space_and_offset_locked(&mut state, capacity, false).0 >= n);
177
178 let w_off = state.writer_offset;
179 let w_ab = state.writer_ab;
180
181 for (_, r) in state.readers.iter_mut() {
182 let space = Reader::<T, N, M>::reader_space(capacity, w_off, w_ab, r.offset, r.ab);
183
184 if !meta.is_empty() {
185 r.meta.add_from_slice(space, meta);
186 }
187 r.reader_notifier.notify();
188 }
189
190 if state.writer_offset + n >= self.buffer.capacity() {
191 state.writer_ab = !state.writer_ab;
192 }
193 state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
194 }
195}
196
197impl<T, N, M> Drop for Writer<T, N, M>
198where
199 N: Notifier,
200 M: Metadata,
201{
202 fn drop(&mut self) {
203 let mut state = self.state.lock();
204 state.writer_done = true;
205 for (_, r) in state.readers.iter_mut() {
206 r.reader_notifier.notify();
207 }
208 }
209}
210
211pub struct Reader<T, N, M>
213where
214 N: Notifier,
215 M: Metadata,
216{
217 id: usize,
218 last_space: usize,
219 buffer: Arc<DoubleMappedBuffer<T>>,
220 state: Arc<Mutex<State<N, M>>>,
221}
222
223impl<T, N, M> Reader<T, N, M>
224where
225 N: Notifier,
226 M: Metadata,
227{
228 #[inline(always)]
229 fn reader_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
230 if r_off > w_off {
231 w_off + capacity - r_off
232 } else if r_off < w_off {
233 w_off - r_off
234 } else if r_ab == w_ab {
235 0
236 } else {
237 capacity
238 }
239 }
240
241 #[inline(always)]
242 fn space_and_offset_locked(
243 state: &mut State<N, M>,
244 id: usize,
245 capacity: usize,
246 arm: bool,
247 ) -> (usize, usize, bool) {
248 let done = state.writer_done;
249 let w_off = state.writer_offset;
250 let w_ab = state.writer_ab;
251
252 let my = unsafe { state.readers.get_unchecked_mut(id) };
253 let space = Self::reader_space(capacity, w_off, w_ab, my.offset, my.ab);
254
255 if space == 0 && arm {
256 my.reader_notifier.arm();
257 }
258
259 (space, my.offset, done)
260 }
261
262 pub fn slice(&mut self, arm: bool) -> Option<&[T]> {
264 let mut state = self.state.lock();
265 let (space, offset, done) =
266 Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
267 self.last_space = space;
268
269 if space == 0 && done {
270 return None;
271 }
272
273 unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
274 }
275
276 pub fn slice_with_metadata_into(&mut self, arm: bool, out: &mut Vec<M::Item>) -> Option<&[T]> {
278 let mut state = self.state.lock();
279 let (space, offset, done) =
280 Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
281 let my = unsafe { state.readers.get_unchecked_mut(self.id) };
282
283 my.meta.get_into(out);
284 self.last_space = space;
285
286 if space == 0 && done {
287 out.clear();
288 return None;
289 }
290
291 unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
292 }
293
294 pub fn consume(&mut self, n: usize) {
300 if n == 0 {
301 return;
302 }
303
304 assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
305 self.last_space -= n;
306
307 let mut state = self.state.lock();
308 debug_assert!(
309 Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), false).0
310 >= n
311 );
312 let my = unsafe { state.readers.get_unchecked_mut(self.id) };
313
314 my.meta.consume(n);
315
316 if my.offset + n >= self.buffer.capacity() {
317 my.ab = !my.ab;
318 }
319 my.offset = (my.offset + n) % self.buffer.capacity();
320
321 my.writer_notifier.notify();
322 }
323}
324
325impl<T, N, M> Drop for Reader<T, N, M>
326where
327 N: Notifier,
328 M: Metadata,
329{
330 fn drop(&mut self) {
331 let mut state = self.state.lock();
332 let mut s = state.readers.remove(self.id);
333 s.writer_notifier.notify();
334 }
335}