direct_ring_buffer/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{
4    ptr::NonNull,
5    slice::{from_raw_parts, from_raw_parts_mut},
6    sync::{
7        atomic::{AtomicUsize, Ordering},
8        Arc,
9    },
10};
11
12/// Producer part of the ring buffer.
13pub struct Producer<T> {
14    buffer: Arc<DirectRingBuffer<T>>,
15    index: usize,
16}
17
18impl<T> Producer<T> {
19    /// Returns the number of elements available for writing.
20    ///
21    /// This method returns the number of elements available for writing.
22    ///
23    /// # Returns
24    ///
25    /// Number of elements available for writing.
26    ///
27    /// # Example
28    ///
29    /// ```
30    /// use direct_ring_buffer::{create_ring_buffer};
31    ///
32    /// let (producer, _) = create_ring_buffer::<u8>(5);
33    /// assert_eq!(producer.available(), 5);
34    /// ```
35    pub fn available(&self) -> usize {
36        self.buffer.available_write()
37    }
38
39    /// Writes elements to the ring buffer.
40    ///
41    /// This method writes elements to the ring buffer using the provided closure.
42    /// The closure `f` receives a mutable slice of writable elements and the
43    /// current offset within the write operation, and it should return the number
44    /// of elements written. The `max_size` parameter specifies the maximum number of
45    /// elements to write. If `None`, the method attempts to write as many elements
46    /// as available.
47    ///
48    /// If there is no space available for writing, the function returns immediately
49    /// without blocking, and the closure is not called.
50    ///
51    /// # Arguments
52    ///
53    /// * `f` - A closure for writing elements. It takes a mutable slice of writable
54    ///   elements and an offset, and returns the number of elements written. The
55    ///   closure will not be called if there are no writable elements. If the
56    ///   buffer wraps around, the closure may be called twice. The slice passed
57    ///   to the closure contains the currently writable elements. The offset is
58    ///   `0` for the first call and increases by the number of elements written
59    ///   in subsequent calls. If the closure returns a value less than the
60    ///   length of the slice passed to it, it is considered as an interruption
61    ///   of the write operation by that number of elements.
62    /// * `max_size` - An optional parameter specifying the maximum number of elements
63    ///   to write. If `None`, the method will write up to the number of
64    ///   available elements.
65    ///
66    /// # Returns
67    ///
68    /// The number of elements written.
69    ///
70    /// # Example
71    ///
72    /// ```
73    /// use direct_ring_buffer::{create_ring_buffer, Producer};
74    ///
75    /// let (mut producer, _) = create_ring_buffer::<u8>(5);
76    /// producer.write_slices(|data, _| {
77    ///     data[..3].copy_from_slice(&[1, 2, 3]);
78    ///     3
79    /// }, None);
80    ///
81    /// producer.write_slices(|data, _| {
82    ///     data[..2].copy_from_slice(&[4, 5]);
83    ///     2
84    /// }, None);
85    /// assert_eq!(producer.available(), 0);
86    /// ```
87    pub fn write_slices(
88        &mut self,
89        mut f: impl FnMut(&mut [T], usize) -> usize,
90        max_size: Option<usize>,
91    ) -> usize {
92        let available = self.available();
93        self.buffer.process_slices(
94            &mut self.index,
95            available,
96            |buf, len, process_offset| {
97                f(
98                    // No boundaries are crossed.
99                    unsafe { from_raw_parts_mut(buf, len) },
100                    process_offset,
101                )
102            },
103            max_size,
104            |atomic, processed| {
105                atomic.fetch_add(processed, Ordering::Release);
106            },
107        )
108    }
109
110    /// Writes elements to the ring buffer. (Deprecated)
111    ///
112    /// This method writes elements to the ring buffer using the provided closure.
113    ///
114    /// # Arguments
115    ///
116    /// * `f` - A closure for writing elements.
117    /// * `max_size` - An optional parameter specifying the maximum number of elements to write.
118    ///
119    /// # Returns
120    ///
121    /// The number of elements written.
122    #[deprecated(note = "Please use `write_slices` instead")]
123    pub fn write(
124        &mut self,
125        f: impl FnMut(&mut [T], usize) -> usize,
126        max_size: Option<usize>,
127    ) -> usize {
128        self.write_slices(f, max_size)
129    }
130
131    /// Writes a single element to the ring buffer.
132    ///
133    /// This method writes a single element to the ring buffer. If the buffer is full,
134    /// it returns `false`.
135    ///
136    /// # Arguments
137    ///
138    /// * `value` - The element to write to the buffer.
139    ///
140    /// # Returns
141    ///
142    /// `true` if the element was successfully written, `false` if the buffer is full.
143    ///
144    /// # Example
145    ///
146    /// ```
147    /// use direct_ring_buffer::{create_ring_buffer};
148    ///
149    /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
150    /// assert!(producer.write_element(1));
151    /// assert!(producer.write_element(2));
152    /// assert!(producer.write_element(3));
153    /// assert!(producer.write_element(4));
154    /// assert!(producer.write_element(5));
155    /// assert!(!producer.write_element(6)); // Buffer is full
156    /// assert_eq!(producer.available(), 0);
157    /// assert_eq!(consumer.available(), 5);
158    /// consumer.read_slices(|data, offset| {
159    ///     assert_eq!(data, &([1, 2, 3, 4, 5][offset..offset + data.len()]));
160    ///     data.len()
161    /// }, None);
162    /// assert_eq!(consumer.available(), 0);
163    /// assert_eq!(producer.available(), 5);
164    /// ```
165    pub fn write_element(&mut self, value: T) -> bool {
166        self.buffer.write_element(&mut self.index, value)
167    }
168}
169
170unsafe impl<T> Send for Producer<T> {}
171
172/// Consumer part of the ring buffer.
173pub struct Consumer<T> {
174    buffer: Arc<DirectRingBuffer<T>>,
175    index: usize,
176}
177
178impl<T> Consumer<T> {
179    /// Returns the number of elements available for reading.
180    ///
181    /// This method returns the number of elements available for reading.
182    ///
183    /// # Returns
184    ///
185    /// Number of elements available for reading.
186    ///
187    /// # Example
188    /// ```
189    /// use direct_ring_buffer::{create_ring_buffer};
190    ///
191    /// let (_, consumer) = create_ring_buffer::<u8>(5);
192    /// assert_eq!(consumer.available(), 0);
193    /// ```
194    pub fn available(&self) -> usize {
195        self.buffer.available_read()
196    }
197
198    /// Reads elements from the ring buffer.
199    ///
200    /// This method reads elements from the ring buffer using the provided closure.
201    /// The closure `f` receives a slice of readable elements and the current
202    /// offset within the read operation, and it should return the number of elements
203    /// read. The `max_size` parameter specifies the maximum number of elements to
204    /// read. If `None`, the method attempts to read as many elements as available.
205    ///
206    /// If there are no elements available for reading, the function returns
207    /// immediately without blocking, and the closure is not called.
208    ///
209    /// # Arguments
210    ///
211    /// * `f` - A closure that processes the readable elements. It takes a reference
212    ///   to a slice of readable elements and an offset as arguments, and
213    ///   returns the number of elements read. The closure will not be called if
214    ///   there are no readable elements. If the buffer wraps around, the closure
215    ///   may be called twice. The slice passed to the closure contains the
216    ///   currently accessible elements. The offset is `0` for the first call
217    ///   and increases by the number of elements read in subsequent calls. If
218    ///   the closure returns a value less than the length of the slice passed to
219    ///   it, it is considered as an interruption of the read operation by that
220    ///   number of elements.
221    /// * `max_size` - An optional parameter specifying the maximum number of elements
222    ///   to read. If `None`, the method will read up to the number of
223    ///   available elements.
224    ///
225    /// # Returns
226    ///
227    /// The number of elements read.
228    ///
229    /// # Example
230    ///
231    /// ```
232    /// use direct_ring_buffer::{create_ring_buffer};
233    ///
234    /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
235    /// producer.write_slices(|data, offset| {
236    ///     assert_eq!(data.len(), 5);
237    ///     data[..2].copy_from_slice(&[1, 2]);
238    ///     2
239    /// }, None);
240    /// consumer.read_slices(|data, offset| {
241    ///     assert_eq!(data.len(), 2);
242    ///     assert_eq!(offset, 0);
243    ///     2
244    /// }, None);
245    /// producer.write_slices(|data, offset| {
246    ///     data.copy_from_slice(&([3, 4, 5, 6, 7][offset..offset + data.len()]));
247    ///     data.len()
248    /// }, None);
249    /// consumer.read_slices(|data, offset| {
250    ///     assert_eq!(data, &([3, 4, 5, 6, 7][offset..offset + data.len()]));
251    ///     data.len()
252    /// }, None);
253    ///
254    /// ```
255    pub fn read_slices(
256        &mut self,
257        mut f: impl FnMut(&[T], usize) -> usize,
258        max_size: Option<usize>,
259    ) -> usize {
260        let available = self.available();
261        self.buffer.process_slices(
262            &mut self.index,
263            available,
264            |buf, len, process_offset| {
265                f(
266                    // No boundaries are crossed.
267                    unsafe { from_raw_parts(buf, len) },
268                    process_offset,
269                )
270            },
271            max_size,
272            |atomic, processed| {
273                atomic.fetch_sub(processed, Ordering::Release);
274            },
275        )
276    }
277
278    /// Reads elements from the ring buffer. (Deprecated)
279    ///
280    /// This method reads elements from the ring buffer using the provided closure.
281    ///
282    /// # Arguments
283    ///
284    /// * `f` - A closure that processes the readable elements.
285    /// * `max_size` - An optional parameter specifying the maximum number of elements to read.
286    ///
287    /// # Returns
288    ///
289    /// The number of elements read.
290    #[deprecated(note = "Please use `read_slices` instead")]
291    pub fn read(&mut self, f: impl FnMut(&[T], usize) -> usize, max_size: Option<usize>) -> usize {
292        self.read_slices(f, max_size)
293    }
294
295    /// Reads a single element from the ring buffer.
296    ///
297    /// This method reads a single element from the ring buffer and returns it. If the
298    /// buffer is empty, it returns `None`.
299    ///
300    /// # Returns
301    ///
302    /// An `Option` containing the element if available, or `None` if the buffer is
303    /// empty.
304    ///
305    /// # Example
306    ///
307    /// ```
308    /// use direct_ring_buffer::{create_ring_buffer};
309    ///
310    /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
311    /// producer.write_slices(|data, offset| {
312    ///     data.copy_from_slice(&([3, 4, 5, 6, 7][offset..offset + data.len()]));
313    ///     data.len()
314    /// }, None);
315    /// assert_eq!(consumer.read_element(), Some(3));
316    /// assert_eq!(consumer.read_element(), Some(4));
317    /// assert_eq!(consumer.read_element(), Some(5));
318    /// assert_eq!(consumer.read_element(), Some(6));
319    /// assert_eq!(consumer.read_element(), Some(7));
320    /// assert_eq!(consumer.read_element(), None);
321    /// ```
322    pub fn read_element(&mut self) -> Option<T>
323    where
324        T: Copy,
325    {
326        self.buffer.read_element(&mut self.index)
327    }
328}
329
330unsafe impl<T> Send for Consumer<T> {}
331
332struct DirectRingBuffer<T> {
333    raw: *mut [T],
334    buf: NonNull<T>,
335    size: usize,
336    used: AtomicUsize,
337}
338
339impl<T> DirectRingBuffer<T> {
340    /// Returns the number of elements available for reading.
341    #[inline]
342    fn available_read(&self) -> usize {
343        self.used.load(Ordering::Acquire)
344    }
345
346    /// Returns the number of elements available for writing.
347    #[inline]
348    fn available_write(&self) -> usize {
349        self.size - self.used.load(Ordering::Acquire)
350    }
351
352    /// Returns a pointer to the buffer at the specified offset.
353    #[inline]
354    fn ptr_at(&self, count: usize) -> *mut T {
355        unsafe { self.buf.as_ptr().add(count) }
356    }
357
358    /// Updates the index to wrap around the buffer.
359    #[inline]
360    fn wraparound_index(&self, index: &mut usize, advance: usize) {
361        *index = if *index + advance >= self.size {
362            0
363        } else {
364            *index + advance
365        }
366    }
367
368    /// Reads a single element from the buffer.
369    fn read_element(&self, index: &mut usize) -> Option<T>
370    where
371        T: Copy,
372    {
373        if self.available_read() == 0 {
374            None
375        } else {
376            let ret = Some(unsafe { self.ptr_at(*index).read() });
377            self.wraparound_index(index, 1);
378            self.used.fetch_sub(1, Ordering::Release);
379            ret
380        }
381    }
382
383    /// Writes a single element to the buffer.
384    fn write_element(&self, index: &mut usize, value: T) -> bool {
385        if self.available_write() == 0 {
386            false
387        } else {
388            unsafe { self.ptr_at(*index).write(value) };
389            self.wraparound_index(index, 1);
390            self.used.fetch_add(1, Ordering::Release);
391            true
392        }
393    }
394
395    /// Read/Write common process.
396    fn process_slices(
397        &self,
398        index: &mut usize,
399        available: usize,
400        mut f: impl FnMut(*mut T, usize, usize) -> usize,
401        max_size: Option<usize>,
402        update_used: impl FnOnce(&AtomicUsize, usize),
403    ) -> usize {
404        let mut total_processed = 0;
405        let max_size = max_size.unwrap_or(available).min(available);
406
407        while total_processed < max_size {
408            let part_start = *index;
409            let part_len = (self.size - part_start).min(max_size - total_processed);
410            let processed = f(self.ptr_at(part_start), part_len, total_processed);
411            total_processed += processed;
412            self.wraparound_index(index, processed);
413            if processed < part_len {
414                // Aborting the operation because the return value
415                // from the closure is smaller then expected.
416                break;
417            }
418        }
419        update_used(&self.used, total_processed);
420        total_processed
421    }
422}
423
424impl<T> Drop for DirectRingBuffer<T> {
425    fn drop(&mut self) {
426        unsafe {
427            drop(Box::from_raw(self.raw));
428        }
429    }
430}
431
432/// Creates a ring buffer with the specified size.
433///
434/// # Arguments
435///
436/// * `size` - The size of the ring buffer.
437///
438/// # Returns
439///
440/// A tuple containing a `Producer<T>` and a `Consumer<T>`.
441///
442/// # Example
443///
444/// ```
445/// use direct_ring_buffer::create_ring_buffer;
446/// let (mut producer, mut consumer) = create_ring_buffer::<u8>(10);
447/// producer.write_slices(|data, _| {
448///     data.copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
449///     10
450/// }, None);
451///
452/// let mut read_data = vec![0; 10];
453/// consumer.read_slices(|data, _| {
454///     read_data[..data.len()].copy_from_slice(data);
455///     data.len()
456/// }, None);
457/// assert_eq!(read_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
458/// ```
459pub fn create_ring_buffer<T: Default>(size: usize) -> (Producer<T>, Consumer<T>) {
460    let raw = {
461        let mut vec = Vec::<T>::with_capacity(size);
462        vec.resize_with(size, T::default);
463        Box::into_raw(vec.into_boxed_slice())
464    };
465    let buffer = Arc::new(DirectRingBuffer {
466        raw,
467        buf: unsafe { NonNull::new_unchecked(raw as *mut T) },
468        size,
469        used: AtomicUsize::new(0),
470    });
471    (
472        Producer {
473            buffer: Arc::clone(&buffer),
474            index: 0,
475        },
476        Consumer { buffer, index: 0 },
477    )
478}