vmcircbuffer/
generic.rs

1//! Circular Buffer with generic [Notifier] to implement custom wait/block behavior.
2
3use slab::Slab;
4use std::sync::{Arc, Mutex};
5use thiserror::Error;
6
7use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
8
9/// Error setting up the underlying buffer.
10#[derive(Error, Debug)]
11pub enum CircularError {
12    /// Failed to allocate double mapped buffer.
13    #[error("Failed to allocate double mapped buffer.")]
14    Allocation(DoubleMappedBufferError),
15}
16
17/// A custom notifier can be used to trigger arbitrary mechanism to signal to a
18/// reader or writer that data or buffer space is available. This could be a
19/// write to an sync/async channel or a condition variable.
20pub trait Notifier {
21    /// Arm the notifier.
22    fn arm(&mut self);
23    /// The implementation must
24    /// - only notify if armed
25    /// - notify
26    /// - unarm
27    fn notify(&mut self);
28}
29
30/// Custom metadata to annotate items.
31pub trait Metadata {
32    type Item: Clone;
33
34    /// Create metadata container.
35    fn new() -> Self;
36    /// Add metadata, applying `offset` shift to items.
37    fn add(&mut self, offset: usize, tags: Vec<Self::Item>);
38    /// Get metadata.
39    fn get(&self) -> Vec<Self::Item>;
40    /// Prune metadata, i.e., delete consumed [items](Self::Item) and update offsets for the remaining.
41    fn consume(&mut self, items: usize);
42}
43
44/// Void implementation for the [Metadata] trait for buffers that don't use metadata.
45pub struct NoMetadata;
46impl Metadata for NoMetadata {
47    type Item = ();
48
49    fn new() -> Self {
50        Self
51    }
52    fn add(&mut self, _offset: usize, _tags: Vec<Self::Item>) {}
53    fn get(&self) -> Vec<Self::Item> {
54        Vec::new()
55    }
56    fn consume(&mut self, _items: usize) {}
57}
58
59/// Gerneric Circular Buffer Constructor
60pub struct Circular;
61
62impl Circular {
63    /// Create a buffer that can hold at least `min_items` items of type `T`.
64    ///
65    /// The size is the least common multiple of the page size and the size of `T`.
66    pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
67    where
68        N: Notifier,
69        M: Metadata,
70    {
71        let buffer = match DoubleMappedBuffer::new(min_items) {
72            Ok(buffer) => Arc::new(buffer),
73            Err(e) => return Err(CircularError::Allocation(e)),
74        };
75
76        let state = Arc::new(Mutex::new(State {
77            writer_offset: 0,
78            writer_ab: false,
79            writer_done: false,
80            readers: Slab::new(),
81        }));
82
83        let writer = Writer {
84            buffer,
85            state,
86            last_space: 0,
87        };
88
89        Ok(writer)
90    }
91}
92
93struct State<N, M>
94where
95    N: Notifier,
96    M: Metadata,
97{
98    writer_offset: usize,
99    writer_ab: bool,
100    writer_done: bool,
101    readers: Slab<ReaderState<N, M>>,
102}
103struct ReaderState<N, M> {
104    ab: bool,
105    offset: usize,
106    reader_notifier: N,
107    writer_notifier: N,
108    meta: M,
109}
110
111/// Writer for a generic circular buffer with items of type `T` and [Notifier] of type `N`.
112pub struct Writer<T, N, M>
113where
114    N: Notifier,
115    M: Metadata,
116{
117    last_space: usize,
118    buffer: Arc<DoubleMappedBuffer<T>>,
119    state: Arc<Mutex<State<N, M>>>,
120}
121
122impl<T, N, M> Writer<T, N, M>
123where
124    N: Notifier,
125    M: Metadata,
126{
127    /// Add a [Reader] to the buffer.
128    pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
129        let mut state = self.state.lock().unwrap();
130        let reader_state = ReaderState {
131            ab: state.writer_ab,
132            offset: state.writer_offset,
133            reader_notifier,
134            writer_notifier,
135            meta: M::new(),
136        };
137        let id = state.readers.insert(reader_state);
138
139        Reader {
140            id,
141            last_space: 0,
142            buffer: self.buffer.clone(),
143            state: self.state.clone(),
144        }
145    }
146
147    fn space_and_offset(&self, arm: bool) -> (usize, usize) {
148        let mut state = self.state.lock().unwrap();
149        let capacity = self.buffer.capacity();
150        let w_off = state.writer_offset;
151        let w_ab = state.writer_ab;
152
153        let mut space = capacity;
154
155        for (_, reader) in state.readers.iter_mut() {
156            let r_off = reader.offset;
157            let r_ab = reader.ab;
158
159            let s = if w_off > r_off {
160                r_off + capacity - w_off
161            } else if w_off < r_off {
162                r_off - w_off
163            } else if r_ab == w_ab {
164                capacity
165            } else {
166                0
167            };
168
169            space = std::cmp::min(space, s);
170
171            if s == 0 && arm {
172                reader.writer_notifier.arm();
173                break;
174            }
175            if s == 0 {
176                break;
177            }
178        }
179
180        (space, w_off)
181    }
182
183    /// Get a slice for the output buffer space. Might be empty.
184    pub fn slice(&mut self, arm: bool) -> &mut [T] {
185        let (space, offset) = self.space_and_offset(arm);
186        self.last_space = space;
187        unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
188    }
189
190    /// Indicates that `n` items were written to the output buffer.
191    ///
192    /// It is ok if `n` is zero.
193    ///
194    /// # Panics
195    ///
196    /// If produced more than space was available in the last provided slice.
197    pub fn produce(&mut self, n: usize, meta: Vec<M::Item>) {
198        if n == 0 {
199            return;
200        }
201
202        debug_assert!(self.space_and_offset(false).0 >= n);
203
204        assert!(n <= self.last_space, "vmcircbuffer: produced too much");
205        self.last_space -= n;
206
207        let mut state = self.state.lock().unwrap();
208
209        let w_off = state.writer_offset;
210        let w_ab = state.writer_ab;
211        let capacity = self.buffer.capacity();
212
213        for (_, r) in state.readers.iter_mut() {
214            let r_off = r.offset;
215            let r_ab = r.ab;
216
217            let space = if r_off > w_off {
218                w_off + capacity - r_off
219            } else if r_off < w_off {
220                w_off - r_off
221            } else if r_ab == w_ab {
222                0
223            } else {
224                capacity
225            };
226
227            r.meta.add(space, meta.clone());
228            r.reader_notifier.notify();
229        }
230
231        if state.writer_offset + n >= self.buffer.capacity() {
232            state.writer_ab = !state.writer_ab;
233        }
234        state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
235    }
236}
237
238impl<T, N, M> Drop for Writer<T, N, M>
239where
240    N: Notifier,
241    M: Metadata,
242{
243    fn drop(&mut self) {
244        let mut state = self.state.lock().unwrap();
245        state.writer_done = true;
246        for (_, r) in state.readers.iter_mut() {
247            r.reader_notifier.notify();
248        }
249    }
250}
251
252/// Reader for a generic circular buffer with items of type `T` and [Notifier] of type `N`.
253pub struct Reader<T, N, M>
254where
255    N: Notifier,
256    M: Metadata,
257{
258    id: usize,
259    last_space: usize,
260    buffer: Arc<DoubleMappedBuffer<T>>,
261    state: Arc<Mutex<State<N, M>>>,
262}
263
264impl<T, N, M> Reader<T, N, M>
265where
266    N: Notifier,
267    M: Metadata,
268{
269    fn space_and_offset_and_meta(&self, arm: bool) -> (usize, usize, bool, Vec<M::Item>) {
270        let mut state = self.state.lock().unwrap();
271
272        let capacity = self.buffer.capacity();
273        let done = state.writer_done;
274        let w_off = state.writer_offset;
275        let w_ab = state.writer_ab;
276
277        let my = unsafe { state.readers.get_unchecked_mut(self.id) };
278        let r_off = my.offset;
279        let r_ab = my.ab;
280
281        let space = if r_off > w_off {
282            w_off + capacity - r_off
283        } else if r_off < w_off {
284            w_off - r_off
285        } else if r_ab == w_ab {
286            0
287        } else {
288            capacity
289        };
290
291        if space == 0 && arm {
292            my.reader_notifier.arm();
293        }
294
295        (space, r_off, done, my.meta.get())
296    }
297
298    /// Get a slice with the items available to read.
299    ///
300    /// Returns `None` if the reader was dropped and all data was read.
301    pub fn slice(&mut self, arm: bool) -> Option<(&[T], Vec<M::Item>)> {
302        let (space, offset, done, tags) = self.space_and_offset_and_meta(arm);
303        self.last_space = space;
304        if space == 0 && done {
305            None
306        } else {
307            unsafe { Some((&self.buffer.slice_with_offset(offset)[0..space], tags)) }
308        }
309    }
310
311    /// Indicates that `n` items were read.
312    ///
313    /// # Panics
314    ///
315    /// If consumed more than space was available in the last provided slice.
316    pub fn consume(&mut self, n: usize) {
317        if n == 0 {
318            return;
319        }
320
321        debug_assert!(self.space_and_offset_and_meta(false).0 >= n);
322
323        assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
324        self.last_space -= n;
325
326        let mut state = self.state.lock().unwrap();
327        let my = unsafe { state.readers.get_unchecked_mut(self.id) };
328
329        my.meta.consume(n);
330
331        if my.offset + n >= self.buffer.capacity() {
332            my.ab = !my.ab;
333        }
334        my.offset = (my.offset + n) % self.buffer.capacity();
335
336        my.writer_notifier.notify();
337    }
338}
339
340impl<T, N, M> Drop for Reader<T, N, M>
341where
342    N: Notifier,
343    M: Metadata,
344{
345    fn drop(&mut self) {
346        let mut state = self.state.lock().unwrap();
347        let mut s = state.readers.remove(self.id);
348        s.writer_notifier.notify();
349    }
350}