ringbuf_basedrop/producer.rs
1use basedrop::Shared;
2use core::{
3 mem::{self, MaybeUninit},
4 ptr::copy_nonoverlapping,
5 slice,
6 sync::atomic::Ordering,
7};
8#[cfg(feature = "std")]
9use std::io::{self, Read, Write};
10
11use crate::{consumer::Consumer, ring_buffer::*};
12
13/// Producer part of ring buffer.
14pub struct Producer<T: Send + Sized + 'static> {
15 pub(crate) rb: Shared<RingBuffer<T>>,
16}
17
18impl<T: Send + Sized + 'static> Producer<T> {
19 /// Returns capacity of the ring buffer.
20 ///
21 /// The capacity of the buffer is constant.
22 pub fn capacity(&self) -> usize {
23 self.rb.capacity()
24 }
25
26 /// Checks if the ring buffer is empty.
27 ///
28 /// The result is relevant until you push items to the producer.
29 pub fn is_empty(&self) -> bool {
30 self.rb.is_empty()
31 }
32
33 /// Checks if the ring buffer is full.
34 ///
35 /// *The result may become irrelevant at any time because of concurring activity of the consumer.*
36 pub fn is_full(&self) -> bool {
37 self.rb.is_full()
38 }
39
40 /// The length of the data stored in the buffer.
41 ///
42 /// Actual length may be equal to or less than the returned value.
43 pub fn len(&self) -> usize {
44 self.rb.len()
45 }
46
47 /// The remaining space in the buffer.
48 ///
49 /// Actual remaining space may be equal to or greater than the returning value.
50 pub fn remaining(&self) -> usize {
51 self.rb.remaining()
52 }
53
54 /// Allows to write into ring buffer memory directly.
55 ///
56 /// *This function is unsafe because it gives access to possibly uninitialized memory*
57 ///
58 /// The method takes a function `f` as argument.
59 /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
60 /// First slice contains older elements.
61 ///
62 /// `f` should return number of elements been written.
63 /// *There is no checks for returned number - it remains on the developer's conscience.*
64 ///
65 /// The method **always** calls `f` even if ring buffer is full.
66 ///
67 /// The method returns number returned from `f`.
68 ///
69 /// # Safety
70 ///
71 /// The method gives access to ring buffer underlying memory which may be uninitialized.
72 ///
73 pub unsafe fn push_access<F>(&mut self, f: F) -> usize
74 where
75 F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
76 {
77 let head = self.rb.head.load(Ordering::Acquire);
78 let tail = self.rb.tail.load(Ordering::Acquire);
79 let len = self.rb.data.len();
80
81 let ranges = if tail >= head {
82 if head > 0 {
83 (tail..len, 0..(head - 1))
84 } else if tail < len - 1 {
85 (tail..(len - 1), 0..0)
86 } else {
87 (0..0, 0..0)
88 }
89 } else if tail < head - 1 {
90 (tail..(head - 1), 0..0)
91 } else {
92 (0..0, 0..0)
93 };
94
95 let ptr = self.rb.data.get_mut().as_mut_ptr();
96
97 let slices = (
98 slice::from_raw_parts_mut(ptr.add(ranges.0.start), ranges.0.len()),
99 slice::from_raw_parts_mut(ptr.add(ranges.1.start), ranges.1.len()),
100 );
101
102 let n = f(slices.0, slices.1);
103
104 if n > 0 {
105 let new_tail = (tail + n) % len;
106 self.rb.tail.store(new_tail, Ordering::Release);
107 }
108 n
109 }
110
111 /// Copies data from the slice to the ring buffer in byte-to-byte manner.
112 ///
113 /// The `elems` slice should contain **initialized** data before the method call.
114 /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**.
115 ///
116 /// Returns the number of items been copied.
117 ///
118 /// # Safety
119 ///
120 /// The method copies raw data into the ring buffer.
121 ///
122 /// *You should properly fill the slice and manage remaining elements after copy.*
123 ///
124 pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize {
125 self.push_access(|left, right| -> usize {
126 if elems.len() < left.len() {
127 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len());
128 elems.len()
129 } else {
130 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len());
131 if elems.len() < left.len() + right.len() {
132 copy_nonoverlapping(
133 elems.as_ptr().add(left.len()),
134 right.as_mut_ptr(),
135 elems.len() - left.len(),
136 );
137 elems.len()
138 } else {
139 copy_nonoverlapping(
140 elems.as_ptr().add(left.len()),
141 right.as_mut_ptr(),
142 right.len(),
143 );
144 left.len() + right.len()
145 }
146 }
147 })
148 }
149
150 /// Appends an element to the ring buffer.
151 /// On failure returns an error containing the element that hasn't been appended.
152 pub fn push(&mut self, elem: T) -> Result<(), T> {
153 let mut elem_mu = MaybeUninit::new(elem);
154 let n = unsafe {
155 self.push_access(|slice, _| {
156 if !slice.is_empty() {
157 mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
158 1
159 } else {
160 0
161 }
162 })
163 };
164 match n {
165 0 => Err(unsafe { elem_mu.assume_init() }),
166 1 => Ok(()),
167 _ => unreachable!(),
168 }
169 }
170
171 /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer.
172 ///
173 /// The closure is called until it returns `None` or the ring buffer is full.
174 ///
175 /// The method returns number of elements been put into the buffer.
176 pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize {
177 unsafe {
178 self.push_access(|left, right| {
179 for (i, dst) in left.iter_mut().enumerate() {
180 match f() {
181 Some(e) => dst.as_mut_ptr().write(e),
182 None => return i,
183 };
184 }
185 for (i, dst) in right.iter_mut().enumerate() {
186 match f() {
187 Some(e) => dst.as_mut_ptr().write(e),
188 None => return i + left.len(),
189 };
190 }
191 left.len() + right.len()
192 })
193 }
194 }
195
196 /// Appends elements from an iterator to the ring buffer.
197 /// Elements that haven't been added to the ring buffer remain in the iterator.
198 ///
199 /// Returns count of elements been appended to the ring buffer.
200 pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize {
201 self.push_each(|| elems.next())
202 }
203
204 /// Removes at most `count` elements from the consumer and appends them to the producer.
205 /// If `count` is `None` then as much as possible elements will be moved.
206 /// The producer and consumer parts may be of different buffers as well as of the same one.
207 ///
208 /// On success returns number of elements been moved.
209 pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize {
210 move_items(other, self, count)
211 }
212}
213
214impl<T: Copy + Send + Sized + 'static> Producer<T> {
215 /// Appends elements from slice to the ring buffer.
216 /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
217 ///
218 /// Returns count of elements been appended to the ring buffer.
219 pub fn push_slice(&mut self, elems: &[T]) -> usize {
220 unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) }
221 }
222}
223
224#[cfg(feature = "std")]
225impl Producer<u8> {
226 /// Reads at most `count` bytes
227 /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance
228 /// and appends them to the ring buffer.
229 /// If `count` is `None` then as much as possible bytes will be read.
230 ///
231 /// Returns `Ok(n)` if `read` succeeded. `n` is number of bytes been read.
232 /// `n == 0` means that either `read` returned zero or ring buffer is full.
233 ///
234 /// If `read` is failed or returned an invalid number then error is returned.
235 pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> {
236 let mut err = None;
237 let n = unsafe {
238 self.push_access(|left, _| -> usize {
239 let left = match count {
240 Some(c) => {
241 if c < left.len() {
242 &mut left[0..c]
243 } else {
244 left
245 }
246 }
247 None => left,
248 };
249 match reader
250 .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8]))
251 .and_then(|n| {
252 if n <= left.len() {
253 Ok(n)
254 } else {
255 Err(io::Error::new(
256 io::ErrorKind::InvalidInput,
257 "Read operation returned an invalid number",
258 ))
259 }
260 }) {
261 Ok(n) => n,
262 Err(e) => {
263 err = Some(e);
264 0
265 }
266 }
267 })
268 };
269 match err {
270 Some(e) => Err(e),
271 None => Ok(n),
272 }
273 }
274}
275
276#[cfg(feature = "std")]
277impl Write for Producer<u8> {
278 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
279 let n = self.push_slice(buffer);
280 if n == 0 && !buffer.is_empty() {
281 Err(io::ErrorKind::WouldBlock.into())
282 } else {
283 Ok(n)
284 }
285 }
286
287 fn flush(&mut self) -> io::Result<()> {
288 Ok(())
289 }
290}