async_ringbuffer/
lib.rs

1//! An asynchronous, fixed-capacity single-reader single-writer ring buffer that notifies the reader onces data becomes available, and notifies the writer once new space for data becomes available. This is done via the AsyncRead and AsyncWrite traits.
2
3#![deny(missing_docs)]
4
5extern crate futures_io;
6
7#[cfg(test)]
8extern crate futures;
9
10use core::pin::Pin;
11use core::task::Context;
12use std::cell::RefCell;
13use std::cmp::min;
14use std::ptr::copy_nonoverlapping;
15use std::rc::Rc;
16
17use futures_io::{AsyncRead, AsyncWrite, Result};
18use std::task::{Poll, Poll::Pending, Poll::Ready, Waker};
19
20mod duplex;
21pub use duplex::Duplex;
22
23/// Creates a new RingBuffer with the given capacity, and returns a handle for
24/// writing and a handle for reading.
25///
26/// # Panics
27/// Panics if capacity is `0` or greater than `isize::max_value()`.
28pub fn ring_buffer(capacity: usize) -> (Writer, Reader) {
29    if capacity == 0 || capacity > (isize::max_value() as usize) {
30        panic!("Invalid ring buffer capacity.");
31    }
32
33    let mut data: Vec<u8> = Vec::with_capacity(capacity);
34    let ptr = data.as_mut_slice().as_mut_ptr();
35
36    let rb = Rc::new(RefCell::new(RingBuffer {
37        data,
38        read: ptr,
39        amount: 0,
40        waker: None,
41        did_shutdown: false,
42    }));
43
44    (Writer(Rc::clone(&rb)), Reader(rb))
45}
46
47struct RingBuffer {
48    data: Vec<u8>,
49    // reading resumes from this position, this always points into the buffer
50    read: *mut u8,
51    // amount of valid data
52    amount: usize,
53    waker: Option<Waker>,
54    did_shutdown: bool,
55}
56
57fn offset_from<T>(x: *const T, other: *const T) -> isize
58where
59    T: Sized,
60{
61    let size = std::mem::size_of::<T>();
62    assert!(size != 0);
63    let diff = (x as isize).wrapping_sub(other as isize);
64    diff / size as isize
65}
66
67impl RingBuffer {
68    fn park(&mut self, waker: &Waker) {
69        self.waker = Some(waker.clone());
70    }
71
72    fn wake(&mut self) {
73        if let Some(w) = self.waker.take() {
74            w.wake()
75        }
76    }
77
78    fn write_ptr(&mut self) -> *mut u8 {
79        unsafe {
80            let start = self.data.as_mut_slice().as_mut_ptr();
81            let diff = offset_from(self.read.add(self.amount), start.add(self.data.capacity()));
82
83            if diff < 0 {
84                self.read.add(self.amount)
85            } else {
86                start.offset(diff)
87            }
88        }
89    }
90}
91
92/// Write access to a nonblocking ring buffer with fixed capacity.
93///
94/// If there is no space in the buffer to write to, the current task is parked
95/// and notified once space becomes available.
96pub struct Writer(Rc<RefCell<RingBuffer>>);
97
98impl Writer {
99    /// Returns true if the writer has been closed, and will therefore no longer
100    ///  accept writes.
101    pub fn is_closed(&self) -> bool {
102        self.0.borrow().did_shutdown
103    }
104}
105
106impl Drop for Writer {
107    fn drop(&mut self) {
108        self.0.borrow_mut().wake();
109    }
110}
111
112impl AsyncWrite for Writer {
113    /// Write data to the RingBuffer.
114    ///
115    /// This only returns `Ok(Ready(0))` if either `buf.len() == 0`, `poll_close` has been called,
116    /// or if the corresponding `Reader` has been dropped and no more data will be read to free up
117    /// space for new data.
118    ///
119    /// # Errors
120    /// This never emits an error.
121    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
122        let mut rb = self.0.borrow_mut();
123
124        if buf.is_empty() || rb.did_shutdown {
125            return Ready(Ok(0));
126        }
127
128        let capacity = rb.data.capacity();
129        let start = rb.data.as_mut_slice().as_mut_ptr();
130        let end = unsafe { start.add(capacity) }; // end itself is 1 byte outside the buffer
131
132        if rb.amount == capacity {
133            if Rc::strong_count(&self.0) == 1 {
134                return Ready(Ok(0));
135            } else {
136                rb.park(cx.waker());
137                return Pending;
138            }
139        }
140
141        let buf_ptr = buf.as_ptr();
142        let write_total = min(buf.len(), capacity - rb.amount);
143
144        if (unsafe { rb.write_ptr().add(write_total) } as *const u8) < end {
145            // non-wrapping case
146            unsafe { copy_nonoverlapping(buf_ptr, rb.write_ptr(), write_total) };
147
148            rb.amount += write_total;
149        } else {
150            // wrapping case
151            let distance_we = offset_from(end, rb.write_ptr()) as usize;
152            let remaining: usize = write_total - distance_we;
153
154            unsafe { copy_nonoverlapping(buf_ptr, rb.write_ptr(), distance_we) };
155            unsafe { copy_nonoverlapping(buf_ptr.add(distance_we), start, remaining) };
156
157            rb.amount += write_total;
158        }
159
160        debug_assert!(rb.read >= start);
161        debug_assert!(rb.read < end);
162        debug_assert!(rb.amount <= capacity);
163
164        rb.wake();
165        Ready(Ok(write_total))
166    }
167
168    /// # Errors
169    /// This never emits an error.
170    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
171        Ready(Ok(()))
172    }
173
174    /// Once closing is complete, the corresponding reader will always return `Ok(Ready(0))` on
175    /// `poll_read` once all remaining buffered data has been read.
176    ///
177    /// # Errors
178    /// This never emits an error.
179    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
180        let mut rb = self.0.borrow_mut();
181
182        if !rb.did_shutdown {
183            rb.wake(); // only unpark on first call, makes this method idempotent
184        }
185        rb.did_shutdown = true;
186
187        Ready(Ok(()))
188    }
189}
190
191/// Read access to a nonblocking ring buffer with fixed capacity.
192///
193/// If there is no data in the buffer to read from, the current task is parked
194/// and notified once data becomes available.
195pub struct Reader(Rc<RefCell<RingBuffer>>);
196
197impl Reader {
198    /// Returns true if the writer side of the ringbuffer has been closed.
199    /// Reads will continue to produce data as long as there are still unread
200    /// bytes in the ringbuffer.
201    pub fn is_closed(&self) -> bool {
202        self.0.borrow().did_shutdown
203    }
204}
205
206impl Drop for Reader {
207    fn drop(&mut self) {
208        self.0.borrow_mut().wake();
209    }
210}
211
212impl AsyncRead for Reader {
213    /// Read data from the RingBuffer.
214    ///
215    /// This only returns `Ok(Ready(0))` if either `buf.len() == 0`, `poll_close`
216    /// was called on the corresponding `Writer` and all buffered data has been read, or if the
217    /// corresponding `Writer` has been dropped.
218    ///
219    /// # Errors
220    /// This never emits an error.
221    fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
222        let mut rb = self.0.borrow_mut();
223
224        if buf.is_empty() {
225            return Ready(Ok(0));
226        }
227
228        let capacity = rb.data.capacity();
229        let start = rb.data.as_mut_slice().as_mut_ptr();
230        let end = unsafe { start.add(capacity) }; // end itself is 1 byte outside the buffer
231
232        if rb.amount == 0 {
233            if Rc::strong_count(&self.0) == 1 || rb.did_shutdown {
234                return Ready(Ok(0));
235            } else {
236                rb.park(cx.waker());
237                return Pending;
238            }
239        }
240
241        let buf_ptr = buf.as_mut_ptr();
242        let read_total = min(buf.len(), rb.amount);
243
244        if (unsafe { rb.read.add(read_total) } as *const u8) < end {
245            // non-wrapping case
246            unsafe { copy_nonoverlapping(rb.read, buf_ptr, read_total) };
247
248            rb.read = unsafe { rb.read.add(read_total) };
249            rb.amount -= read_total;
250        } else {
251            // wrapping case
252            let distance_re = offset_from(end, rb.read) as usize;
253            let remaining: usize = read_total - distance_re;
254
255            unsafe { copy_nonoverlapping(rb.read, buf_ptr, distance_re) };
256            unsafe { copy_nonoverlapping(start, buf_ptr.add(distance_re), remaining) };
257
258            rb.read = unsafe { start.add(remaining) };
259            rb.amount -= read_total;
260        }
261
262        debug_assert!(rb.read >= start);
263        debug_assert!(rb.read < end);
264        debug_assert!(rb.amount <= capacity);
265
266        rb.wake();
267        Ready(Ok(read_total))
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use futures::executor::block_on;
275    use futures::future::join;
276    use futures::io::{AsyncReadExt, AsyncWriteExt};
277
278    #[test]
279    fn it_works() {
280        let (mut writer, mut reader) = ring_buffer(8);
281        let data: Vec<u8> = (0..255).collect();
282        let write_all = async {
283            writer.write_all(&data).await.unwrap();
284            writer.close().await.unwrap();
285        };
286
287        let mut out: Vec<u8> = Vec::with_capacity(256);
288        let read_all = reader.read_to_end(&mut out);
289
290        block_on(async { join(write_all, read_all).await.1.unwrap() });
291
292        for (i, byte) in out.iter().enumerate() {
293            assert_eq!(*byte, i as u8);
294        }
295    }
296
297    #[test]
298    #[should_panic]
299    /// Calling `ring_buffer` with capacity 0 panics
300    fn panic_on_capacity_0() {
301        let _ = ring_buffer(0);
302    }
303
304    #[test]
305    #[should_panic]
306    /// Calling `ring_buffer` with capacity (isize::max_value() as usize) + 1 panics
307    fn panic_on_capacity_too_large() {
308        let _ = ring_buffer((isize::max_value() as usize) + 1);
309    }
310
311    #[test]
312    fn close() {
313        let (mut writer, mut reader) = ring_buffer(8);
314        block_on(async {
315            writer.write_all(&[1, 2, 3, 4, 5]).await.unwrap();
316            assert!(!writer.is_closed());
317            assert!(!reader.is_closed());
318
319            writer.close().await.unwrap();
320
321            assert!(writer.is_closed());
322            assert!(reader.is_closed());
323
324            let r = writer.write_all(&[6, 7, 8]).await;
325            assert!(r.is_err());
326
327            let mut buf = [0; 8];
328            let n = reader.read(&mut buf).await.unwrap();
329            assert_eq!(n, 5);
330
331            let n = reader.read(&mut buf).await.unwrap();
332            assert_eq!(n, 0);
333        });
334    }
335}