direct_ring_buffer/lib.rs
1#![doc = include_str!("../README.md")]
2
3use std::{
4 ptr::NonNull,
5 slice::{from_raw_parts, from_raw_parts_mut},
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10};
11
12/// Producer part of the ring buffer.
13pub struct Producer<T> {
14 buffer: Arc<DirectRingBuffer<T>>,
15 index: usize,
16}
17
18impl<T> Producer<T> {
19 /// Returns the number of elements available for writing.
20 ///
21 /// This method returns the number of elements available for writing.
22 ///
23 /// # Returns
24 ///
25 /// Number of elements available for writing.
26 ///
27 /// # Example
28 ///
29 /// ```
30 /// use direct_ring_buffer::{create_ring_buffer};
31 ///
32 /// let (producer, _) = create_ring_buffer::<u8>(5);
33 /// assert_eq!(producer.available(), 5);
34 /// ```
35 pub fn available(&self) -> usize {
36 self.buffer.available_write()
37 }
38
39 /// Writes elements to the ring buffer.
40 ///
41 /// This method writes elements to the ring buffer using the provided closure.
42 /// The closure `f` receives a mutable slice of writable elements and the
43 /// current offset within the write operation, and it should return the number
44 /// of elements written. The `max_size` parameter specifies the maximum number of
45 /// elements to write. If `None`, the method attempts to write as many elements
46 /// as available.
47 ///
48 /// If there is no space available for writing, the function returns immediately
49 /// without blocking, and the closure is not called.
50 ///
51 /// # Arguments
52 ///
53 /// * `f` - A closure for writing elements. It takes a mutable slice of writable
54 /// elements and an offset, and returns the number of elements written. The
55 /// closure will not be called if there are no writable elements. If the
56 /// buffer wraps around, the closure may be called twice. The slice passed
57 /// to the closure contains the currently writable elements. The offset is
58 /// `0` for the first call and increases by the number of elements written
59 /// in subsequent calls. If the closure returns a value less than the
60 /// length of the slice passed to it, it is considered as an interruption
61 /// of the write operation by that number of elements.
62 /// * `max_size` - An optional parameter specifying the maximum number of elements
63 /// to write. If `None`, the method will write up to the number of
64 /// available elements.
65 ///
66 /// # Returns
67 ///
68 /// The number of elements written.
69 ///
70 /// # Example
71 ///
72 /// ```
73 /// use direct_ring_buffer::{create_ring_buffer, Producer};
74 ///
75 /// let (mut producer, _) = create_ring_buffer::<u8>(5);
76 /// producer.write_slices(|data, _| {
77 /// data[..3].copy_from_slice(&[1, 2, 3]);
78 /// 3
79 /// }, None);
80 ///
81 /// producer.write_slices(|data, _| {
82 /// data[..2].copy_from_slice(&[4, 5]);
83 /// 2
84 /// }, None);
85 /// assert_eq!(producer.available(), 0);
86 /// ```
87 pub fn write_slices(
88 &mut self,
89 mut f: impl FnMut(&mut [T], usize) -> usize,
90 max_size: Option<usize>,
91 ) -> usize {
92 let available = self.available();
93 self.buffer.process_slices(
94 &mut self.index,
95 available,
96 |buf, len, process_offset| {
97 f(
98 // No boundaries are crossed.
99 unsafe { from_raw_parts_mut(buf, len) },
100 process_offset,
101 )
102 },
103 max_size,
104 |atomic, processed| {
105 atomic.fetch_add(processed, Ordering::Release);
106 },
107 )
108 }
109
110 /// Writes elements to the ring buffer. (Deprecated)
111 ///
112 /// This method writes elements to the ring buffer using the provided closure.
113 ///
114 /// # Arguments
115 ///
116 /// * `f` - A closure for writing elements.
117 /// * `max_size` - An optional parameter specifying the maximum number of elements to write.
118 ///
119 /// # Returns
120 ///
121 /// The number of elements written.
122 #[deprecated(note = "Please use `write_slices` instead")]
123 pub fn write(
124 &mut self,
125 f: impl FnMut(&mut [T], usize) -> usize,
126 max_size: Option<usize>,
127 ) -> usize {
128 self.write_slices(f, max_size)
129 }
130
131 /// Writes a single element to the ring buffer.
132 ///
133 /// This method writes a single element to the ring buffer. If the buffer is full,
134 /// it returns `false`.
135 ///
136 /// # Arguments
137 ///
138 /// * `value` - The element to write to the buffer.
139 ///
140 /// # Returns
141 ///
142 /// `true` if the element was successfully written, `false` if the buffer is full.
143 ///
144 /// # Example
145 ///
146 /// ```
147 /// use direct_ring_buffer::{create_ring_buffer};
148 ///
149 /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
150 /// assert!(producer.write_element(1));
151 /// assert!(producer.write_element(2));
152 /// assert!(producer.write_element(3));
153 /// assert!(producer.write_element(4));
154 /// assert!(producer.write_element(5));
155 /// assert!(!producer.write_element(6)); // Buffer is full
156 /// assert_eq!(producer.available(), 0);
157 /// assert_eq!(consumer.available(), 5);
158 /// consumer.read_slices(|data, offset| {
159 /// assert_eq!(data, &([1, 2, 3, 4, 5][offset..offset + data.len()]));
160 /// data.len()
161 /// }, None);
162 /// assert_eq!(consumer.available(), 0);
163 /// assert_eq!(producer.available(), 5);
164 /// ```
165 pub fn write_element(&mut self, value: T) -> bool {
166 self.buffer.write_element(&mut self.index, value)
167 }
168}
169
170unsafe impl<T> Send for Producer<T> {}
171
172/// Consumer part of the ring buffer.
173pub struct Consumer<T> {
174 buffer: Arc<DirectRingBuffer<T>>,
175 index: usize,
176}
177
178impl<T> Consumer<T> {
179 /// Returns the number of elements available for reading.
180 ///
181 /// This method returns the number of elements available for reading.
182 ///
183 /// # Returns
184 ///
185 /// Number of elements available for reading.
186 ///
187 /// # Example
188 /// ```
189 /// use direct_ring_buffer::{create_ring_buffer};
190 ///
191 /// let (_, consumer) = create_ring_buffer::<u8>(5);
192 /// assert_eq!(consumer.available(), 0);
193 /// ```
194 pub fn available(&self) -> usize {
195 self.buffer.available_read()
196 }
197
198 /// Reads elements from the ring buffer.
199 ///
200 /// This method reads elements from the ring buffer using the provided closure.
201 /// The closure `f` receives a slice of readable elements and the current
202 /// offset within the read operation, and it should return the number of elements
203 /// read. The `max_size` parameter specifies the maximum number of elements to
204 /// read. If `None`, the method attempts to read as many elements as available.
205 ///
206 /// If there are no elements available for reading, the function returns
207 /// immediately without blocking, and the closure is not called.
208 ///
209 /// # Arguments
210 ///
211 /// * `f` - A closure that processes the readable elements. It takes a reference
212 /// to a slice of readable elements and an offset as arguments, and
213 /// returns the number of elements read. The closure will not be called if
214 /// there are no readable elements. If the buffer wraps around, the closure
215 /// may be called twice. The slice passed to the closure contains the
216 /// currently accessible elements. The offset is `0` for the first call
217 /// and increases by the number of elements read in subsequent calls. If
218 /// the closure returns a value less than the length of the slice passed to
219 /// it, it is considered as an interruption of the read operation by that
220 /// number of elements.
221 /// * `max_size` - An optional parameter specifying the maximum number of elements
222 /// to read. If `None`, the method will read up to the number of
223 /// available elements.
224 ///
225 /// # Returns
226 ///
227 /// The number of elements read.
228 ///
229 /// # Example
230 ///
231 /// ```
232 /// use direct_ring_buffer::{create_ring_buffer};
233 ///
234 /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
235 /// producer.write_slices(|data, offset| {
236 /// assert_eq!(data.len(), 5);
237 /// data[..2].copy_from_slice(&[1, 2]);
238 /// 2
239 /// }, None);
240 /// consumer.read_slices(|data, offset| {
241 /// assert_eq!(data.len(), 2);
242 /// assert_eq!(offset, 0);
243 /// 2
244 /// }, None);
245 /// producer.write_slices(|data, offset| {
246 /// data.copy_from_slice(&([3, 4, 5, 6, 7][offset..offset + data.len()]));
247 /// data.len()
248 /// }, None);
249 /// consumer.read_slices(|data, offset| {
250 /// assert_eq!(data, &([3, 4, 5, 6, 7][offset..offset + data.len()]));
251 /// data.len()
252 /// }, None);
253 ///
254 /// ```
255 pub fn read_slices(
256 &mut self,
257 mut f: impl FnMut(&[T], usize) -> usize,
258 max_size: Option<usize>,
259 ) -> usize {
260 let available = self.available();
261 self.buffer.process_slices(
262 &mut self.index,
263 available,
264 |buf, len, process_offset| {
265 f(
266 // No boundaries are crossed.
267 unsafe { from_raw_parts(buf, len) },
268 process_offset,
269 )
270 },
271 max_size,
272 |atomic, processed| {
273 atomic.fetch_sub(processed, Ordering::Release);
274 },
275 )
276 }
277
278 /// Reads elements from the ring buffer. (Deprecated)
279 ///
280 /// This method reads elements from the ring buffer using the provided closure.
281 ///
282 /// # Arguments
283 ///
284 /// * `f` - A closure that processes the readable elements.
285 /// * `max_size` - An optional parameter specifying the maximum number of elements to read.
286 ///
287 /// # Returns
288 ///
289 /// The number of elements read.
290 #[deprecated(note = "Please use `read_slices` instead")]
291 pub fn read(&mut self, f: impl FnMut(&[T], usize) -> usize, max_size: Option<usize>) -> usize {
292 self.read_slices(f, max_size)
293 }
294
295 /// Reads a single element from the ring buffer.
296 ///
297 /// This method reads a single element from the ring buffer and returns it. If the
298 /// buffer is empty, it returns `None`.
299 ///
300 /// # Returns
301 ///
302 /// An `Option` containing the element if available, or `None` if the buffer is
303 /// empty.
304 ///
305 /// # Example
306 ///
307 /// ```
308 /// use direct_ring_buffer::{create_ring_buffer};
309 ///
310 /// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
311 /// producer.write_slices(|data, offset| {
312 /// data.copy_from_slice(&([3, 4, 5, 6, 7][offset..offset + data.len()]));
313 /// data.len()
314 /// }, None);
315 /// assert_eq!(consumer.read_element(), Some(3));
316 /// assert_eq!(consumer.read_element(), Some(4));
317 /// assert_eq!(consumer.read_element(), Some(5));
318 /// assert_eq!(consumer.read_element(), Some(6));
319 /// assert_eq!(consumer.read_element(), Some(7));
320 /// assert_eq!(consumer.read_element(), None);
321 /// ```
322 pub fn read_element(&mut self) -> Option<T>
323 where
324 T: Copy,
325 {
326 self.buffer.read_element(&mut self.index)
327 }
328}
329
330unsafe impl<T> Send for Consumer<T> {}
331
332struct DirectRingBuffer<T> {
333 raw: *mut [T],
334 buf: NonNull<T>,
335 size: usize,
336 used: AtomicUsize,
337}
338
339impl<T> DirectRingBuffer<T> {
340 /// Returns the number of elements available for reading.
341 #[inline]
342 fn available_read(&self) -> usize {
343 self.used.load(Ordering::Acquire)
344 }
345
346 /// Returns the number of elements available for writing.
347 #[inline]
348 fn available_write(&self) -> usize {
349 self.size - self.used.load(Ordering::Acquire)
350 }
351
352 /// Returns a pointer to the buffer at the specified offset.
353 #[inline]
354 fn ptr_at(&self, count: usize) -> *mut T {
355 unsafe { self.buf.as_ptr().add(count) }
356 }
357
358 /// Updates the index to wrap around the buffer.
359 #[inline]
360 fn wraparound_index(&self, index: &mut usize, advance: usize) {
361 *index = if *index + advance >= self.size {
362 0
363 } else {
364 *index + advance
365 }
366 }
367
368 /// Reads a single element from the buffer.
369 fn read_element(&self, index: &mut usize) -> Option<T>
370 where
371 T: Copy,
372 {
373 if self.available_read() == 0 {
374 None
375 } else {
376 let ret = Some(unsafe { self.ptr_at(*index).read() });
377 self.wraparound_index(index, 1);
378 self.used.fetch_sub(1, Ordering::Release);
379 ret
380 }
381 }
382
383 /// Writes a single element to the buffer.
384 fn write_element(&self, index: &mut usize, value: T) -> bool {
385 if self.available_write() == 0 {
386 false
387 } else {
388 unsafe { self.ptr_at(*index).write(value) };
389 self.wraparound_index(index, 1);
390 self.used.fetch_add(1, Ordering::Release);
391 true
392 }
393 }
394
395 /// Read/Write common process.
396 fn process_slices(
397 &self,
398 index: &mut usize,
399 available: usize,
400 mut f: impl FnMut(*mut T, usize, usize) -> usize,
401 max_size: Option<usize>,
402 update_used: impl FnOnce(&AtomicUsize, usize),
403 ) -> usize {
404 let mut total_processed = 0;
405 let max_size = max_size.unwrap_or(available).min(available);
406
407 while total_processed < max_size {
408 let part_start = *index;
409 let part_len = (self.size - part_start).min(max_size - total_processed);
410 let processed = f(self.ptr_at(part_start), part_len, total_processed);
411 total_processed += processed;
412 self.wraparound_index(index, processed);
413 if processed < part_len {
414 // Aborting the operation because the return value
415 // from the closure is smaller then expected.
416 break;
417 }
418 }
419 update_used(&self.used, total_processed);
420 total_processed
421 }
422}
423
424impl<T> Drop for DirectRingBuffer<T> {
425 fn drop(&mut self) {
426 unsafe {
427 drop(Box::from_raw(self.raw));
428 }
429 }
430}
431
432/// Creates a ring buffer with the specified size.
433///
434/// # Arguments
435///
436/// * `size` - The size of the ring buffer.
437///
438/// # Returns
439///
440/// A tuple containing a `Producer<T>` and a `Consumer<T>`.
441///
442/// # Example
443///
444/// ```
445/// use direct_ring_buffer::create_ring_buffer;
446/// let (mut producer, mut consumer) = create_ring_buffer::<u8>(10);
447/// producer.write_slices(|data, _| {
448/// data.copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
449/// 10
450/// }, None);
451///
452/// let mut read_data = vec![0; 10];
453/// consumer.read_slices(|data, _| {
454/// read_data[..data.len()].copy_from_slice(data);
455/// data.len()
456/// }, None);
457/// assert_eq!(read_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
458/// ```
459pub fn create_ring_buffer<T: Default>(size: usize) -> (Producer<T>, Consumer<T>) {
460 let raw = {
461 let mut vec = Vec::<T>::with_capacity(size);
462 vec.resize_with(size, T::default);
463 Box::into_raw(vec.into_boxed_slice())
464 };
465 let buffer = Arc::new(DirectRingBuffer {
466 raw,
467 buf: unsafe { NonNull::new_unchecked(raw as *mut T) },
468 size,
469 used: AtomicUsize::new(0),
470 });
471 (
472 Producer {
473 buffer: Arc::clone(&buffer),
474 index: 0,
475 },
476 Consumer { buffer, index: 0 },
477 )
478}