1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
/*!
# Direct Ring Buffer
This crate provides a high-performance, lock-free ring buffer for single-producer, single-consumer scenarios. The main components of this crate are the `Producer` and `Consumer` structures, which allow for efficient data writing and reading, respectively.
## Overview
A ring buffer is a fixed-size buffer that works as a circular queue. This implementation uses a lock-free approach, making it suitable for real-time applications where minimal latency is crucial. The buffer supports generic types with the `Copy` trait, ensuring that data can be efficiently copied in and out of the buffer.
## Features
- **Single-Producer, Single-Consumer**: Designed for scenarios with a single writer and a single reader.
- **Lock-Free**: Utilizes atomic operations for synchronization, avoiding the need for mutexes or other locking mechanisms.
- **Slice-Based I/O**: Supports reading and writing multiple elements at a time using slices, enhancing performance for bulk operations.
- **Closure-Based Access**: Provides direct access to the buffer through closures, allowing for flexible and efficient data processing.
- **Generic Types with Copy Trait**: The `Copy` trait is not used for copying elements. It is needed to prevent the use of types implementing `Drop`, as the buffer is allocated uninitialized.
## Example
```rust
use direct_ring_buffer::{create_ring_buffer, Producer, Consumer};
let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
// Writing data to the buffer
producer.write(|data, _| {
data[..3].copy_from_slice(&[1, 2, 3]);
3
}, None);
// Reading data from the buffer
consumer.read(|data, _| {
assert_eq!(data, &[1, 2, 3]);
data.len()
}, None);
```
In this example, a ring buffer of size 5 is created. The producer writes 3 elements into the buffer, and the consumer reads them back, verifying the data.
## Safety and Performance
This implementation ensures that data is accessed safely through `unsafe` blocks with proper checks. The use of atomic operations ensures minimal overhead for synchronization, making it suitable for high-performance applications.
- **Optimized for Bulk Operations**: Designed to handle multiple elements at once, reducing overhead for batch processing. Single-element operations may incur significant overhead.
*/
use std::{
cell::UnsafeCell,
slice::{from_raw_parts, from_raw_parts_mut},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
/// Producer part of the ring buffer.
pub struct Producer<T> {
buffer: Arc<DirectRingBuffer<T>>,
index: usize,
}
impl<T> Producer<T> {
/// Returns the number of elements available for writing.
///
/// This method returns the number of elements available for writing.
///
/// # Returns
///
/// Number of elements available for writing.
///
/// # Example
///
/// ```
/// use direct_ring_buffer::{create_ring_buffer};
///
/// let (producer, _) = create_ring_buffer::<u8>(5);
/// assert_eq!(producer.available(), 5);
/// ```
pub fn available(&self) -> usize {
self.buffer.available_write()
}
/// Writes data to the ring buffer.
///
/// This method writes data to the ring buffer using the provided closure.
/// The closure `f` receives a mutable slice of writable elements and the
/// current offset within the write operation, and it should return the number of elements written.
/// The `max_size` parameter specifies the maximum number of elements to
/// write. If `None`, the method attempts to write as many elements as available.
///
/// If there is no space available for writing, the function returns immediately without blocking, and the closure is not called.
///
/// # Arguments
///
/// * `f` - A closure for writing elements. It takes a mutable slice of
/// writable elements and an offset, and returns the number of
/// elements written. The closure will not be called if there are no
/// writable elements. If the buffer wraps around, the closure may be
/// called twice. The slice passed to the closure contains the
/// currently writable elements. The offset is `0` for the first call
/// and increases by the number of elements written in subsequent calls.
/// If the closure returns a value less than the length of the slice passed to it,
/// it is considered as an interruption of the write operation by that number of elements.
/// * `max_size` - An optional parameter specifying the maximum number of
/// elements to write. If `None`, the method will write up to
/// the number of available elements.
///
/// # Returns
///
/// The number of elements written.
///
/// # Example
///
/// ```
/// use direct_ring_buffer::{create_ring_buffer, Producer};
///
/// let (mut producer, _) = create_ring_buffer::<u8>(5);
/// producer.write(|data, _| {
/// data[..3].copy_from_slice(&[1, 2, 3]);
/// 3
/// }, None);
///
/// producer.write(|data, _| {
/// data[..2].copy_from_slice(&[4, 5]);
/// 2
/// }, None);
/// assert_eq!(producer.available(), 0);
/// ```
pub fn write(
&mut self,
mut f: impl FnMut(&mut [T], usize) -> usize,
max_size: Option<usize>,
) -> usize {
let available = self.available();
self.buffer.process_slices(
&mut self.index,
available,
|buf, len, process_offset| {
f(
// No boundaries are crossed.
unsafe { from_raw_parts_mut(buf, len) },
process_offset,
)
},
max_size,
|atomic, processed| {
atomic.fetch_add(processed, Ordering::Release);
},
)
}
}
unsafe impl<T> Send for Producer<T> {}
/// Consumer part of the ring buffer.
pub struct Consumer<T> {
buffer: Arc<DirectRingBuffer<T>>,
index: usize,
}
impl<T> Consumer<T> {
/// Returns the number of elements available for reading.
///
/// This method returns the number of elements available for reading.
///
/// # Returns
///
/// Number of elements available for reading.
///
/// # Example
/// ```
/// use direct_ring_buffer::{create_ring_buffer};
///
/// let (_, consumer) = create_ring_buffer::<u8>(5);
/// assert_eq!(consumer.available(), 0);
/// ```
pub fn available(&self) -> usize {
self.buffer.available_read()
}
/// Reads data from the ring buffer.
///
/// This method reads data from the ring buffer using the provided closure.
/// The closure `f` receives a slice of readable elements and the current
/// offset within the read operation, and it should return the number of elements read.
/// The `max_size` parameter specifies the maximum number of elements to
/// read. If `None`, the method attempts to read as many elements as available.
///
/// If there is no data available for reading, the function returns immediately without blocking, and the closure is not called.
///
/// # Arguments
///
/// * `f` - A closure that processes the readable elements. It takes a
/// reference to a slice of readable elements and an offset as
/// arguments, and returns the number of elements read. The closure
/// will not be called if there are no readable elements. If the
/// buffer wraps around, the closure may be called twice. The slice
/// passed to the closure contains the currently accessible elements.
/// The offset is `0` for the first call and increases by the number
/// of elements read in subsequent calls.
/// If the closure returns a value less than the length of the slice passed to it,
/// it is considered as an interruption of the read operation by that number of elements.
/// * `max_size` - An optional parameter specifying the maximum number of
/// elements to read. If `None`, the method will read up to
/// the number of available elements.
///
/// # Returns
///
/// The number of elements read.
///
/// # Example
///
/// ```
/// use direct_ring_buffer::{create_ring_buffer};
///
/// let (mut producer, mut consumer) = create_ring_buffer::<u8>(5);
/// producer.write(|data, offset| {
/// assert_eq!(data.len(), 5);
/// data[..2].copy_from_slice(&[1, 2]);
/// 2
/// }, None);
/// consumer.read(|data, offset| {
/// assert_eq!(data.len(), 2);
/// assert_eq!(offset, 0);
/// 2
/// }, None);
/// producer.write(|data, offset| {
/// data.copy_from_slice(&([3, 4, 5, 6, 7][offset..offset + data.len()]));
/// data.len()
/// }, None);
/// consumer.read(|data, offset| {
/// assert_eq!(data, &([3, 4, 5, 6, 7][offset..offset + data.len()]));
/// data.len()
/// }, None);
///
/// ```
pub fn read(
&mut self,
mut f: impl FnMut(&[T], usize) -> usize,
max_size: Option<usize>,
) -> usize {
let available = self.available();
self.buffer.process_slices(
&mut self.index,
available,
|buf, len, process_offset| {
f(
// No boundaries are crossed.
unsafe { from_raw_parts(buf, len) },
process_offset,
)
},
max_size,
|atomic, processed| {
atomic.fetch_sub(processed, Ordering::Release);
},
)
}
}
unsafe impl<T> Send for Consumer<T> {}
struct DirectRingBuffer<T> {
elements: UnsafeCell<Box<[T]>>,
used: AtomicUsize,
}
impl<T> DirectRingBuffer<T> {
/// Returns the number of elements available for reading.
fn available_read(&self) -> usize {
self.used.load(Ordering::Acquire)
}
/// Returns the number of elements available for writing.
fn available_write(&self) -> usize {
unsafe { &*self.elements.get() }.len() - self.used.load(Ordering::Acquire)
}
/// Read/Write common process (internal).
fn process_slices(
&self,
index: &mut usize,
available: usize,
mut f: impl FnMut(*mut T, usize, usize) -> usize,
max_size: Option<usize>,
update_used: impl FnOnce(&AtomicUsize, usize),
) -> usize {
let buffer = unsafe { &mut *self.elements.get() };
let buffer_len = buffer.len();
let mut total_processed = 0;
let max_size = max_size.unwrap_or(available).min(available);
while total_processed < max_size {
let part_start = *index;
let part_len = (buffer_len - part_start).min(max_size - total_processed);
let processed = f(
unsafe { buffer.get_unchecked_mut(part_start) },
part_len,
total_processed,
);
total_processed += processed;
*index += processed;
if *index >= buffer_len {
*index = 0
}
if processed < part_len {
// Aborting the operation because the return value
// from the closure is smaller then expected.
break;
}
}
update_used(&self.used, total_processed);
total_processed
}
}
/// Creates a ring buffer with the specified size.
///
/// # Arguments
///
/// * `size` - The size of the ring buffer.
///
/// # Returns
///
/// A tuple containing a `Producer<T>` and a `Consumer<T>`.
///
/// # Example
///
/// ```
/// use direct_ring_buffer::create_ring_buffer;
/// let (mut producer, mut consumer) = create_ring_buffer::<u8>(10);
/// producer.write(|data, _| {
/// data.copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
/// 10
/// }, None);
///
/// let mut read_data = vec![0; 10];
/// consumer.read(|data, _| {
/// read_data[..data.len()].copy_from_slice(data);
/// data.len()
/// }, None);
/// assert_eq!(read_data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
/// ```
pub fn create_ring_buffer<T: Copy>(size: usize) -> (Producer<T>, Consumer<T>) {
let buffer = Arc::new(DirectRingBuffer {
elements: UnsafeCell::new({
let mut vec = Vec::<T>::with_capacity(size);
unsafe { vec.set_len(size) };
vec.into_boxed_slice()
}),
used: AtomicUsize::new(0),
});
(
Producer {
buffer: Arc::clone(&buffer),
index: 0,
},
Consumer { buffer, index: 0 },
)
}