Skip to main content

vmcircbuffer/
generic.rs

1//! Circular Buffer with generic [Notifier] to implement custom wait/block behavior.
2
3use slab::Slab;
4use spin::Mutex;
5use std::sync::Arc;
6use thiserror::Error;
7
8use crate::double_mapped_buffer::{DoubleMappedBuffer, DoubleMappedBufferError};
9
10/// Error setting up the underlying buffer.
11#[derive(Error, Debug)]
12pub enum CircularError {
13    /// Failed to allocate double mapped buffer.
14    #[error("Failed to allocate double mapped buffer.")]
15    Allocation(DoubleMappedBufferError),
16}
17
18pub use crate::{Metadata, NoMetadata, Notifier};
19
20/// Gerneric Circular Buffer Constructor
21pub struct Circular;
22
23impl Circular {
24    /// Create a buffer that can hold at least `min_items` items of type `T`.
25    ///
26    /// The size is the least common multiple of the page size and the size of `T`.
27    pub fn with_capacity<T, N, M>(min_items: usize) -> Result<Writer<T, N, M>, CircularError>
28    where
29        N: Notifier,
30        M: Metadata,
31    {
32        let buffer = match DoubleMappedBuffer::new(min_items) {
33            Ok(buffer) => Arc::new(buffer),
34            Err(e) => return Err(CircularError::Allocation(e)),
35        };
36
37        let state = Arc::new(Mutex::new(State {
38            writer_offset: 0,
39            writer_ab: false,
40            writer_done: false,
41            readers: Slab::new(),
42        }));
43
44        let writer = Writer {
45            buffer,
46            state,
47            last_space: 0,
48        };
49
50        Ok(writer)
51    }
52}
53
54struct State<N, M>
55where
56    N: Notifier,
57    M: Metadata,
58{
59    writer_offset: usize,
60    writer_ab: bool,
61    writer_done: bool,
62    readers: Slab<ReaderState<N, M>>,
63}
64struct ReaderState<N, M> {
65    ab: bool,
66    offset: usize,
67    reader_notifier: N,
68    writer_notifier: N,
69    meta: M,
70}
71
72/// Writer for a generic circular buffer with items of type `T` and [Notifier] of type `N`.
73pub struct Writer<T, N, M>
74where
75    N: Notifier,
76    M: Metadata,
77{
78    last_space: usize,
79    buffer: Arc<DoubleMappedBuffer<T>>,
80    state: Arc<Mutex<State<N, M>>>,
81}
82
83impl<T, N, M> Writer<T, N, M>
84where
85    N: Notifier,
86    M: Metadata,
87{
88    #[inline(always)]
89    fn writer_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
90        if w_off > r_off {
91            r_off + capacity - w_off
92        } else if w_off < r_off {
93            r_off - w_off
94        } else if r_ab == w_ab {
95            capacity
96        } else {
97            0
98        }
99    }
100
101    /// Add a [Reader] to the buffer.
102    pub fn add_reader(&self, reader_notifier: N, writer_notifier: N) -> Reader<T, N, M> {
103        let mut state = self.state.lock();
104        let reader_state = ReaderState {
105            ab: state.writer_ab,
106            offset: state.writer_offset,
107            reader_notifier,
108            writer_notifier,
109            meta: M::new(),
110        };
111        let id = state.readers.insert(reader_state);
112
113        Reader {
114            id,
115            last_space: 0,
116            buffer: self.buffer.clone(),
117            state: self.state.clone(),
118        }
119    }
120
121    #[inline(always)]
122    fn space_and_offset_locked(
123        state: &mut State<N, M>,
124        capacity: usize,
125        arm: bool,
126    ) -> (usize, usize) {
127        let w_off = state.writer_offset;
128        let w_ab = state.writer_ab;
129
130        let mut space = capacity;
131
132        for (_, reader) in state.readers.iter_mut() {
133            let s = Self::writer_space(capacity, w_off, w_ab, reader.offset, reader.ab);
134
135            space = std::cmp::min(space, s);
136
137            if s == 0 && arm {
138                reader.writer_notifier.arm();
139                break;
140            }
141            if s == 0 {
142                break;
143            }
144        }
145
146        (space, w_off)
147    }
148
149    /// Get a slice for the output buffer space. Might be empty.
150    pub fn slice(&mut self, arm: bool) -> &mut [T] {
151        let mut state = self.state.lock();
152        let (space, offset) =
153            Self::space_and_offset_locked(&mut state, self.buffer.capacity(), arm);
154        self.last_space = space;
155        unsafe { &mut self.buffer.slice_with_offset_mut(offset)[0..space] }
156    }
157
158    /// Indicates that `n` items were written to the output buffer.
159    ///
160    /// It is ok if `n` is zero.
161    ///
162    /// # Panics
163    ///
164    /// If produced more than space was available in the last provided slice.
165    pub fn produce(&mut self, n: usize, meta: &[M::Item]) {
166        if n == 0 {
167            return;
168        }
169
170        assert!(n <= self.last_space, "vmcircbuffer: produced too much");
171        self.last_space -= n;
172
173        let mut state = self.state.lock();
174        let capacity = self.buffer.capacity();
175
176        debug_assert!(Self::space_and_offset_locked(&mut state, capacity, false).0 >= n);
177
178        let w_off = state.writer_offset;
179        let w_ab = state.writer_ab;
180
181        for (_, r) in state.readers.iter_mut() {
182            let space = Reader::<T, N, M>::reader_space(capacity, w_off, w_ab, r.offset, r.ab);
183
184            if !meta.is_empty() {
185                r.meta.add_from_slice(space, meta);
186            }
187            r.reader_notifier.notify();
188        }
189
190        if state.writer_offset + n >= self.buffer.capacity() {
191            state.writer_ab = !state.writer_ab;
192        }
193        state.writer_offset = (state.writer_offset + n) % self.buffer.capacity();
194    }
195}
196
197impl<T, N, M> Drop for Writer<T, N, M>
198where
199    N: Notifier,
200    M: Metadata,
201{
202    fn drop(&mut self) {
203        let mut state = self.state.lock();
204        state.writer_done = true;
205        for (_, r) in state.readers.iter_mut() {
206            r.reader_notifier.notify();
207        }
208    }
209}
210
211/// Reader for a generic circular buffer with items of type `T` and [Notifier] of type `N`.
212pub struct Reader<T, N, M>
213where
214    N: Notifier,
215    M: Metadata,
216{
217    id: usize,
218    last_space: usize,
219    buffer: Arc<DoubleMappedBuffer<T>>,
220    state: Arc<Mutex<State<N, M>>>,
221}
222
223impl<T, N, M> Reader<T, N, M>
224where
225    N: Notifier,
226    M: Metadata,
227{
228    #[inline(always)]
229    fn reader_space(capacity: usize, w_off: usize, w_ab: bool, r_off: usize, r_ab: bool) -> usize {
230        if r_off > w_off {
231            w_off + capacity - r_off
232        } else if r_off < w_off {
233            w_off - r_off
234        } else if r_ab == w_ab {
235            0
236        } else {
237            capacity
238        }
239    }
240
241    #[inline(always)]
242    fn space_and_offset_locked(
243        state: &mut State<N, M>,
244        id: usize,
245        capacity: usize,
246        arm: bool,
247    ) -> (usize, usize, bool) {
248        let done = state.writer_done;
249        let w_off = state.writer_offset;
250        let w_ab = state.writer_ab;
251
252        let my = unsafe { state.readers.get_unchecked_mut(id) };
253        let space = Self::reader_space(capacity, w_off, w_ab, my.offset, my.ab);
254
255        if space == 0 && arm {
256            my.reader_notifier.arm();
257        }
258
259        (space, my.offset, done)
260    }
261
262    /// Get a slice without fetching metadata.
263    pub fn slice(&mut self, arm: bool) -> Option<&[T]> {
264        let mut state = self.state.lock();
265        let (space, offset, done) =
266            Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
267        self.last_space = space;
268
269        if space == 0 && done {
270            return None;
271        }
272
273        unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
274    }
275
276    /// Get a slice and copy metadata into `out` in one call.
277    pub fn slice_with_metadata_into(&mut self, arm: bool, out: &mut Vec<M::Item>) -> Option<&[T]> {
278        let mut state = self.state.lock();
279        let (space, offset, done) =
280            Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), arm);
281        let my = unsafe { state.readers.get_unchecked_mut(self.id) };
282
283        my.meta.get_into(out);
284        self.last_space = space;
285
286        if space == 0 && done {
287            out.clear();
288            return None;
289        }
290
291        unsafe { Some(&self.buffer.slice_with_offset(offset)[0..space]) }
292    }
293
294    /// Indicates that `n` items were read.
295    ///
296    /// # Panics
297    ///
298    /// If consumed more than space was available in the last provided slice.
299    pub fn consume(&mut self, n: usize) {
300        if n == 0 {
301            return;
302        }
303
304        assert!(n <= self.last_space, "vmcircbuffer: consumed too much!");
305        self.last_space -= n;
306
307        let mut state = self.state.lock();
308        debug_assert!(
309            Self::space_and_offset_locked(&mut state, self.id, self.buffer.capacity(), false).0
310                >= n
311        );
312        let my = unsafe { state.readers.get_unchecked_mut(self.id) };
313
314        my.meta.consume(n);
315
316        if my.offset + n >= self.buffer.capacity() {
317            my.ab = !my.ab;
318        }
319        my.offset = (my.offset + n) % self.buffer.capacity();
320
321        my.writer_notifier.notify();
322    }
323}
324
325impl<T, N, M> Drop for Reader<T, N, M>
326where
327    N: Notifier,
328    M: Metadata,
329{
330    fn drop(&mut self) {
331        let mut state = self.state.lock();
332        let mut s = state.readers.remove(self.id);
333        s.writer_notifier.notify();
334    }
335}