1use slab::Slab;
4use std::sync::{Arc, Mutex};
5use thiserror::Error;
6
7use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
8
9#[derive(Error, Debug)]
11pub enum CircularError {
12 #[error("Failed to allocate double mapped buffer.")]
14 Allocation(DoubleMappedBufferError),
15}
16
17pub trait Notifier {
21 fn arm(&mut self);
23 fn notify(&mut self);
28}
29
30pub trait Metadata {
32 type Item: Clone;
33
34 fn new() -> Self;
36 fn add(&mut self, offset: usize, tags: Vec<Self::Item>);
38 fn get(&self) -> Vec<Self::Item>;
40 fn consume(&mut self, items: usize);
42}
43
44pub struct NoMetadata;
46impl Metadata for NoMetadata {
47 type Item = ();
48
49 fn new() -> Self {
50 Self
51 }
52 fn add(&mut self, _offset: usize, _tags: Vec<Self::Item>) {}
53 fn get(&self) -> Vec<Self::Item> {
54 Vec::new()
55 }
56 fn consume(&mut self, _items: usize) {}
57}
58
59pub struct Circular;
61
62impl Circular {
63 pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
67 where
68 N: Notifier,
69 M: Metadata,
70 {
71 let buffer = match DoubleMappedBuffer::new(min_items) {
72 Ok(buffer) => Arc::new(buffer),
73 Err(e) => return Err(CircularError::Allocation(e)),
74 };
75
76 let state = Arc::new(Mutex::new(State {
77 writer_offset: 0,
78 writer_ab: false,
79 writer_done: false,
80 readers: Slab::new(),
81 }));
82
83 let writer = Writer {
84 buffer,
85 state,
86 last_space: 0,
87 };
88
89 Ok(writer)
90 }
91}
92
93struct State<N, M>
94where
95 N: Notifier,
96 M: Metadata,
97{
98 writer_offset: usize,
99 writer_ab: bool,
100 writer_done: bool,
101 readers: Slab<ReaderState<N, M>>,
102}
103struct ReaderState<N, M> {
104 ab: bool,
105 offset: usize,
106 reader_notifier: N,
107 writer_notifier: N,
108 meta: M,
109}
110
111pub struct Writer<T, N, M>
113where
114 N: Notifier,
115 M: Metadata,
116{
117 last_space: usize,
118 buffer: Arc<DoubleMappedBuffer<T>>,
119 state: Arc<Mutex<State<N, M>>>,
120}
121
122impl<T, N, M> Writer<T, N, M>
123where
124 N: Notifier,
125 M: Metadata,
126{
127 pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
129 let mut state = self.state.lock().unwrap();
130 let reader_state = ReaderState {
131 ab: state.writer_ab,
132 offset: state.writer_offset,
133 reader_notifier,
134 writer_notifier,
135 meta: M::new(),
136 };
137 let id = state.readers.insert(reader_state);
138
139 Reader {
140 id,
141 last_space: 0,
142 buffer: self.buffer.clone(),
143 state: self.state.clone(),
144 }
145 }
146
147 fn space_and_offset(&self, arm: bool) -> (usize, usize) {
148 let mut state = self.state.lock().unwrap();
149 let capacity = self.buffer.capacity();
150 let w_off = state.writer_offset;
151 let w_ab = state.writer_ab;
152
153 let mut space = capacity;
154
155 for (_, reader) in state.readers.iter_mut() {
156 let r_off = reader.offset;
157 let r_ab = reader.ab;
158
159 let s = if w_off > r_off {
160 r_off + capacity - w_off
161 } else if w_off < r_off {
162 r_off - w_off
163 } else if r_ab == w_ab {
164 capacity
165 } else {
166 0
167 };
168
169 space = std::cmp::min(space, s);
170
171 if s == 0 && arm {
172 reader.writer_notifier.arm();
173 break;
174 }
175 if s == 0 {
176 break;
177 }
178 }
179
180 (space, w_off)
181 }
182
183 pub fn slice(&mut self, arm: bool) -> &mut [T] {
185 let (space, offset) = self.space_and_offset(arm);
186 self.last_space = space;
187 unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
188 }
189
190 pub fn produce(&mut self, n: usize, meta: Vec<M::Item>) {
198 if n == 0 {
199 return;
200 }
201
202 debug_assert!(self.space_and_offset(false).0 >= n);
203
204 assert!(n <= self.last_space, "vmcircbuffer: produced too much");
205 self.last_space -= n;
206
207 let mut state = self.state.lock().unwrap();
208
209 let w_off = state.writer_offset;
210 let w_ab = state.writer_ab;
211 let capacity = self.buffer.capacity();
212
213 for (_, r) in state.readers.iter_mut() {
214 let r_off = r.offset;
215 let r_ab = r.ab;
216
217 let space = if r_off > w_off {
218 w_off + capacity - r_off
219 } else if r_off < w_off {
220 w_off - r_off
221 } else if r_ab == w_ab {
222 0
223 } else {
224 capacity
225 };
226
227 r.meta.add(space, meta.clone());
228 r.reader_notifier.notify();
229 }
230
231 if state.writer_offset + n >= self.buffer.capacity() {
232 state.writer_ab = !state.writer_ab;
233 }
234 state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
235 }
236}
237
238impl<T, N, M> Drop for Writer<T, N, M>
239where
240 N: Notifier,
241 M: Metadata,
242{
243 fn drop(&mut self) {
244 let mut state = self.state.lock().unwrap();
245 state.writer_done = true;
246 for (_, r) in state.readers.iter_mut() {
247 r.reader_notifier.notify();
248 }
249 }
250}
251
252pub struct Reader<T, N, M>
254where
255 N: Notifier,
256 M: Metadata,
257{
258 id: usize,
259 last_space: usize,
260 buffer: Arc<DoubleMappedBuffer<T>>,
261 state: Arc<Mutex<State<N, M>>>,
262}
263
264impl<T, N, M> Reader<T, N, M>
265where
266 N: Notifier,
267 M: Metadata,
268{
269 fn space_and_offset_and_meta(&self, arm: bool) -> (usize, usize, bool, Vec<M::Item>) {
270 let mut state = self.state.lock().unwrap();
271
272 let capacity = self.buffer.capacity();
273 let done = state.writer_done;
274 let w_off = state.writer_offset;
275 let w_ab = state.writer_ab;
276
277 let my = unsafe { state.readers.get_unchecked_mut(self.id) };
278 let r_off = my.offset;
279 let r_ab = my.ab;
280
281 let space = if r_off > w_off {
282 w_off + capacity - r_off
283 } else if r_off < w_off {
284 w_off - r_off
285 } else if r_ab == w_ab {
286 0
287 } else {
288 capacity
289 };
290
291 if space == 0 && arm {
292 my.reader_notifier.arm();
293 }
294
295 (space, r_off, done, my.meta.get())
296 }
297
298 pub fn slice(&mut self, arm: bool) -> Option<(&[T], Vec<M::Item>)> {
302 let (space, offset, done, tags) = self.space_and_offset_and_meta(arm);
303 self.last_space = space;
304 if space == 0 && done {
305 None
306 } else {
307 unsafe { Some((&self.buffer.slice_with_offset(offset)[0..space], tags)) }
308 }
309 }
310
311 pub fn consume(&mut self, n: usize) {
317 if n == 0 {
318 return;
319 }
320
321 debug_assert!(self.space_and_offset_and_meta(false).0 >= n);
322
323 assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
324 self.last_space -= n;
325
326 let mut state = self.state.lock().unwrap();
327 let my = unsafe { state.readers.get_unchecked_mut(self.id) };
328
329 my.meta.consume(n);
330
331 if my.offset + n >= self.buffer.capacity() {
332 my.ab = !my.ab;
333 }
334 my.offset = (my.offset + n) % self.buffer.capacity();
335
336 my.writer_notifier.notify();
337 }
338}
339
340impl<T, N, M> Drop for Reader<T, N, M>
341where
342 N: Notifier,
343 M: Metadata,
344{
345 fn drop(&mut self) {
346 let mut state = self.state.lock().unwrap();
347 let mut s = state.readers.remove(self.id);
348 s.writer_notifier.notify();
349 }
350}