ringbuf_basedrop/
producer.rs

1use basedrop::Shared;
2use core::{
3    mem::{self, MaybeUninit},
4    ptr::copy_nonoverlapping,
5    slice,
6    sync::atomic::Ordering,
7};
8#[cfg(feature = "std")]
9use std::io::{self, Read, Write};
10
11use crate::{consumer::Consumer, ring_buffer::*};
12
13/// Producer part of ring buffer.
14pub struct Producer<T: Send + Sized + 'static> {
15    pub(crate) rb: Shared<RingBuffer<T>>,
16}
17
18impl<T: Send + Sized + 'static> Producer<T> {
19    /// Returns capacity of the ring buffer.
20    ///
21    /// The capacity of the buffer is constant.
22    pub fn capacity(&self) -> usize {
23        self.rb.capacity()
24    }
25
26    /// Checks if the ring buffer is empty.
27    ///
28    /// The result is relevant until you push items to the producer.
29    pub fn is_empty(&self) -> bool {
30        self.rb.is_empty()
31    }
32
33    /// Checks if the ring buffer is full.
34    ///
35    /// *The result may become irrelevant at any time because of concurring activity of the consumer.*
36    pub fn is_full(&self) -> bool {
37        self.rb.is_full()
38    }
39
40    /// The length of the data stored in the buffer.
41    ///
42    /// Actual length may be equal to or less than the returned value.
43    pub fn len(&self) -> usize {
44        self.rb.len()
45    }
46
47    /// The remaining space in the buffer.
48    ///
49    /// Actual remaining space may be equal to or greater than the returning value.
50    pub fn remaining(&self) -> usize {
51        self.rb.remaining()
52    }
53
54    /// Allows to write into ring buffer memory directly.
55    ///
56    /// *This function is unsafe because it gives access to possibly uninitialized memory*
57    ///
58    /// The method takes a function `f` as argument.
59    /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
60    /// First slice contains older elements.
61    ///
62    /// `f` should return number of elements been written.
63    /// *There is no checks for returned number - it remains on the developer's conscience.*
64    ///
65    /// The method **always** calls `f` even if ring buffer is full.
66    ///
67    /// The method returns number returned from `f`.
68    ///
69    /// # Safety
70    ///
71    /// The method gives access to ring buffer underlying memory which may be uninitialized.
72    ///
73    pub unsafe fn push_access<F>(&mut self, f: F) -> usize
74    where
75        F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
76    {
77        let head = self.rb.head.load(Ordering::Acquire);
78        let tail = self.rb.tail.load(Ordering::Acquire);
79        let len = self.rb.data.len();
80
81        let ranges = if tail >= head {
82            if head > 0 {
83                (tail..len, 0..(head - 1))
84            } else if tail < len - 1 {
85                (tail..(len - 1), 0..0)
86            } else {
87                (0..0, 0..0)
88            }
89        } else if tail < head - 1 {
90            (tail..(head - 1), 0..0)
91        } else {
92            (0..0, 0..0)
93        };
94
95        let ptr = self.rb.data.get_mut().as_mut_ptr();
96
97        let slices = (
98            slice::from_raw_parts_mut(ptr.add(ranges.0.start), ranges.0.len()),
99            slice::from_raw_parts_mut(ptr.add(ranges.1.start), ranges.1.len()),
100        );
101
102        let n = f(slices.0, slices.1);
103
104        if n > 0 {
105            let new_tail = (tail + n) % len;
106            self.rb.tail.store(new_tail, Ordering::Release);
107        }
108        n
109    }
110
111    /// Copies data from the slice to the ring buffer in byte-to-byte manner.
112    ///
113    /// The `elems` slice should contain **initialized** data before the method call.
114    /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**.
115    ///
116    /// Returns the number of items been copied.
117    ///
118    /// # Safety
119    ///
120    /// The method copies raw data into the ring buffer.
121    ///
122    /// *You should properly fill the slice and manage remaining elements after copy.*
123    ///
124    pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize {
125        self.push_access(|left, right| -> usize {
126            if elems.len() < left.len() {
127                copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len());
128                elems.len()
129            } else {
130                copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len());
131                if elems.len() < left.len() + right.len() {
132                    copy_nonoverlapping(
133                        elems.as_ptr().add(left.len()),
134                        right.as_mut_ptr(),
135                        elems.len() - left.len(),
136                    );
137                    elems.len()
138                } else {
139                    copy_nonoverlapping(
140                        elems.as_ptr().add(left.len()),
141                        right.as_mut_ptr(),
142                        right.len(),
143                    );
144                    left.len() + right.len()
145                }
146            }
147        })
148    }
149
150    /// Appends an element to the ring buffer.
151    /// On failure returns an error containing the element that hasn't been appended.
152    pub fn push(&mut self, elem: T) -> Result<(), T> {
153        let mut elem_mu = MaybeUninit::new(elem);
154        let n = unsafe {
155            self.push_access(|slice, _| {
156                if !slice.is_empty() {
157                    mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
158                    1
159                } else {
160                    0
161                }
162            })
163        };
164        match n {
165            0 => Err(unsafe { elem_mu.assume_init() }),
166            1 => Ok(()),
167            _ => unreachable!(),
168        }
169    }
170
171    /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer.
172    ///
173    /// The closure is called until it returns `None` or the ring buffer is full.
174    ///
175    /// The method returns number of elements been put into the buffer.
176    pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize {
177        unsafe {
178            self.push_access(|left, right| {
179                for (i, dst) in left.iter_mut().enumerate() {
180                    match f() {
181                        Some(e) => dst.as_mut_ptr().write(e),
182                        None => return i,
183                    };
184                }
185                for (i, dst) in right.iter_mut().enumerate() {
186                    match f() {
187                        Some(e) => dst.as_mut_ptr().write(e),
188                        None => return i + left.len(),
189                    };
190                }
191                left.len() + right.len()
192            })
193        }
194    }
195
196    /// Appends elements from an iterator to the ring buffer.
197    /// Elements that haven't been added to the ring buffer remain in the iterator.
198    ///
199    /// Returns count of elements been appended to the ring buffer.
200    pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize {
201        self.push_each(|| elems.next())
202    }
203
204    /// Removes at most `count` elements from the consumer and appends them to the producer.
205    /// If `count` is `None` then as much as possible elements will be moved.
206    /// The producer and consumer parts may be of different buffers as well as of the same one.
207    ///
208    /// On success returns number of elements been moved.
209    pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize {
210        move_items(other, self, count)
211    }
212}
213
214impl<T: Copy + Send + Sized + 'static> Producer<T> {
215    /// Appends elements from slice to the ring buffer.
216    /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
217    ///
218    /// Returns count of elements been appended to the ring buffer.
219    pub fn push_slice(&mut self, elems: &[T]) -> usize {
220        unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) }
221    }
222}
223
224#[cfg(feature = "std")]
225impl Producer<u8> {
226    /// Reads at most `count` bytes
227    /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance
228    /// and appends them to the ring buffer.
229    /// If `count` is `None` then as much as possible bytes will be read.
230    ///
231    /// Returns `Ok(n)` if `read` succeeded. `n` is number of bytes been read.
232    /// `n == 0` means that either `read` returned zero or ring buffer is full.
233    ///
234    /// If `read` is failed or returned an invalid number then error is returned.
235    pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> {
236        let mut err = None;
237        let n = unsafe {
238            self.push_access(|left, _| -> usize {
239                let left = match count {
240                    Some(c) => {
241                        if c < left.len() {
242                            &mut left[0..c]
243                        } else {
244                            left
245                        }
246                    }
247                    None => left,
248                };
249                match reader
250                    .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8]))
251                    .and_then(|n| {
252                        if n <= left.len() {
253                            Ok(n)
254                        } else {
255                            Err(io::Error::new(
256                                io::ErrorKind::InvalidInput,
257                                "Read operation returned an invalid number",
258                            ))
259                        }
260                    }) {
261                    Ok(n) => n,
262                    Err(e) => {
263                        err = Some(e);
264                        0
265                    }
266                }
267            })
268        };
269        match err {
270            Some(e) => Err(e),
271            None => Ok(n),
272        }
273    }
274}
275
276#[cfg(feature = "std")]
277impl Write for Producer<u8> {
278    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
279        let n = self.push_slice(buffer);
280        if n == 0 && !buffer.is_empty() {
281            Err(io::ErrorKind::WouldBlock.into())
282        } else {
283            Ok(n)
284        }
285    }
286
287    fn flush(&mut self) -> io::Result<()> {
288        Ok(())
289    }
290}