1#![deny(missing_docs)]
4
5extern crate futures_io;
6
7#[cfg(test)]
8extern crate futures;
9
10use core::pin::Pin;
11use core::task::Context;
12use std::cell::RefCell;
13use std::cmp::min;
14use std::ptr::copy_nonoverlapping;
15use std::rc::Rc;
16
17use futures_io::{AsyncRead, AsyncWrite, Result};
18use std::task::{Poll, Poll::Pending, Poll::Ready, Waker};
19
20mod duplex;
21pub use duplex::Duplex;
22
23pub fn ring_buffer(capacity: usize) -> (Writer, Reader) {
29 if capacity == 0 || capacity > (isize::max_value() as usize) {
30 panic!("Invalid ring buffer capacity.");
31 }
32
33 let mut data: Vec<u8> = Vec::with_capacity(capacity);
34 let ptr = data.as_mut_slice().as_mut_ptr();
35
36 let rb = Rc::new(RefCell::new(RingBuffer {
37 data,
38 read: ptr,
39 amount: 0,
40 waker: None,
41 did_shutdown: false,
42 }));
43
44 (Writer(Rc::clone(&rb)), Reader(rb))
45}
46
47struct RingBuffer {
48 data: Vec<u8>,
49 read: *mut u8,
51 amount: usize,
53 waker: Option<Waker>,
54 did_shutdown: bool,
55}
56
57fn offset_from<T>(x: *const T, other: *const T) -> isize
58where
59 T: Sized,
60{
61 let size = std::mem::size_of::<T>();
62 assert!(size != 0);
63 let diff = (x as isize).wrapping_sub(other as isize);
64 diff / size as isize
65}
66
67impl RingBuffer {
68 fn park(&mut self, waker: &Waker) {
69 self.waker = Some(waker.clone());
70 }
71
72 fn wake(&mut self) {
73 if let Some(w) = self.waker.take() {
74 w.wake()
75 }
76 }
77
78 fn write_ptr(&mut self) -> *mut u8 {
79 unsafe {
80 let start = self.data.as_mut_slice().as_mut_ptr();
81 let diff = offset_from(self.read.add(self.amount), start.add(self.data.capacity()));
82
83 if diff < 0 {
84 self.read.add(self.amount)
85 } else {
86 start.offset(diff)
87 }
88 }
89 }
90}
91
92pub struct Writer(Rc<RefCell<RingBuffer>>);
97
98impl Writer {
99 pub fn is_closed(&self) -> bool {
102 self.0.borrow().did_shutdown
103 }
104}
105
106impl Drop for Writer {
107 fn drop(&mut self) {
108 self.0.borrow_mut().wake();
109 }
110}
111
112impl AsyncWrite for Writer {
113 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize>> {
122 let mut rb = self.0.borrow_mut();
123
124 if buf.is_empty() || rb.did_shutdown {
125 return Ready(Ok(0));
126 }
127
128 let capacity = rb.data.capacity();
129 let start = rb.data.as_mut_slice().as_mut_ptr();
130 let end = unsafe { start.add(capacity) }; if rb.amount == capacity {
133 if Rc::strong_count(&self.0) == 1 {
134 return Ready(Ok(0));
135 } else {
136 rb.park(cx.waker());
137 return Pending;
138 }
139 }
140
141 let buf_ptr = buf.as_ptr();
142 let write_total = min(buf.len(), capacity - rb.amount);
143
144 if (unsafe { rb.write_ptr().add(write_total) } as *const u8) < end {
145 unsafe { copy_nonoverlapping(buf_ptr, rb.write_ptr(), write_total) };
147
148 rb.amount += write_total;
149 } else {
150 let distance_we = offset_from(end, rb.write_ptr()) as usize;
152 let remaining: usize = write_total - distance_we;
153
154 unsafe { copy_nonoverlapping(buf_ptr, rb.write_ptr(), distance_we) };
155 unsafe { copy_nonoverlapping(buf_ptr.add(distance_we), start, remaining) };
156
157 rb.amount += write_total;
158 }
159
160 debug_assert!(rb.read >= start);
161 debug_assert!(rb.read < end);
162 debug_assert!(rb.amount <= capacity);
163
164 rb.wake();
165 Ready(Ok(write_total))
166 }
167
168 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
171 Ready(Ok(()))
172 }
173
174 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<()>> {
180 let mut rb = self.0.borrow_mut();
181
182 if !rb.did_shutdown {
183 rb.wake(); }
185 rb.did_shutdown = true;
186
187 Ready(Ok(()))
188 }
189}
190
191pub struct Reader(Rc<RefCell<RingBuffer>>);
196
197impl Reader {
198 pub fn is_closed(&self) -> bool {
202 self.0.borrow().did_shutdown
203 }
204}
205
206impl Drop for Reader {
207 fn drop(&mut self) {
208 self.0.borrow_mut().wake();
209 }
210}
211
212impl AsyncRead for Reader {
213 fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize>> {
222 let mut rb = self.0.borrow_mut();
223
224 if buf.is_empty() {
225 return Ready(Ok(0));
226 }
227
228 let capacity = rb.data.capacity();
229 let start = rb.data.as_mut_slice().as_mut_ptr();
230 let end = unsafe { start.add(capacity) }; if rb.amount == 0 {
233 if Rc::strong_count(&self.0) == 1 || rb.did_shutdown {
234 return Ready(Ok(0));
235 } else {
236 rb.park(cx.waker());
237 return Pending;
238 }
239 }
240
241 let buf_ptr = buf.as_mut_ptr();
242 let read_total = min(buf.len(), rb.amount);
243
244 if (unsafe { rb.read.add(read_total) } as *const u8) < end {
245 unsafe { copy_nonoverlapping(rb.read, buf_ptr, read_total) };
247
248 rb.read = unsafe { rb.read.add(read_total) };
249 rb.amount -= read_total;
250 } else {
251 let distance_re = offset_from(end, rb.read) as usize;
253 let remaining: usize = read_total - distance_re;
254
255 unsafe { copy_nonoverlapping(rb.read, buf_ptr, distance_re) };
256 unsafe { copy_nonoverlapping(start, buf_ptr.add(distance_re), remaining) };
257
258 rb.read = unsafe { start.add(remaining) };
259 rb.amount -= read_total;
260 }
261
262 debug_assert!(rb.read >= start);
263 debug_assert!(rb.read < end);
264 debug_assert!(rb.amount <= capacity);
265
266 rb.wake();
267 Ready(Ok(read_total))
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use futures::executor::block_on;
275 use futures::future::join;
276 use futures::io::{AsyncReadExt, AsyncWriteExt};
277
278 #[test]
279 fn it_works() {
280 let (mut writer, mut reader) = ring_buffer(8);
281 let data: Vec<u8> = (0..255).collect();
282 let write_all = async {
283 writer.write_all(&data).await.unwrap();
284 writer.close().await.unwrap();
285 };
286
287 let mut out: Vec<u8> = Vec::with_capacity(256);
288 let read_all = reader.read_to_end(&mut out);
289
290 block_on(async { join(write_all, read_all).await.1.unwrap() });
291
292 for (i, byte) in out.iter().enumerate() {
293 assert_eq!(*byte, i as u8);
294 }
295 }
296
297 #[test]
298 #[should_panic]
299 fn panic_on_capacity_0() {
301 let _ = ring_buffer(0);
302 }
303
304 #[test]
305 #[should_panic]
306 fn panic_on_capacity_too_large() {
308 let _ = ring_buffer((isize::max_value() as usize) + 1);
309 }
310
311 #[test]
312 fn close() {
313 let (mut writer, mut reader) = ring_buffer(8);
314 block_on(async {
315 writer.write_all(&[1, 2, 3, 4, 5]).await.unwrap();
316 assert!(!writer.is_closed());
317 assert!(!reader.is_closed());
318
319 writer.close().await.unwrap();
320
321 assert!(writer.is_closed());
322 assert!(reader.is_closed());
323
324 let r = writer.write_all(&[6, 7, 8]).await;
325 assert!(r.is_err());
326
327 let mut buf = [0; 8];
328 let n = reader.read(&mut buf).await.unwrap();
329 assert_eq!(n, 5);
330
331 let n = reader.read(&mut buf).await.unwrap();
332 assert_eq!(n, 0);
333 });
334 }
335}