spsc_buffer/
lib.rs

1use std::cell::UnsafeCell;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7struct SpscBuffer {
8    buf: UnsafeCell<Box<[u8]>>,
9    len: AtomicUsize,
10}
11
12impl SpscBuffer {
13    fn new(size: usize) -> Self {
14        Self {
15            buf: UnsafeCell::new(vec![0; size].into_boxed_slice()),
16            len: AtomicUsize::new(0),
17        }
18    }
19
20    fn len(&self) -> usize {
21        self.len.load(Ordering::SeqCst)
22    }
23
24    fn capacity(&self) -> usize {
25        unsafe { &*self.buf.get() }.len()
26    }
27
28    fn is_empty(&self) -> bool {
29        self.len() == 0
30    }
31
32    fn is_full(&self) -> bool {
33        self.len() == self.capacity()
34    }
35}
36
37/// Consumer of the ringbuffer.
38pub struct SpscBufferReader {
39    start: usize,
40    buffer: Arc<SpscBuffer>,
41}
42
43impl SpscBufferReader {
44    /// Get length of contents currently in the buffer
45    pub fn len(&self) -> usize {
46        self.buffer.len()
47    }
48
49    /// Get total capacity of the buffer
50    pub fn capacity(&self) -> usize {
51        self.buffer.capacity()
52    }
53
54    /// Check whether the buffer is currently empty
55    pub fn is_empty(&self) -> bool {
56        self.buffer.is_empty()
57    }
58
59    /// Check whether the buffer is currently empty
60    pub fn is_full(&self) -> bool {
61        self.buffer.is_full()
62    }
63
64    /// Read data from the buffer. Returns number of bytes read.
65    pub fn read_to_slice(&mut self, buf: &mut [u8]) -> usize {
66        use std::cmp::min;
67
68        let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
69
70        let ringbuf_capacity = ringbuf.len();
71        let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
72
73        // Max number of bytes we might read
74        let max_read_size = min(buf.len(), ringbuf_len);
75        let contents_until_end = ringbuf_capacity - self.start;
76        let read_size = min(max_read_size, contents_until_end);
77
78        buf[..read_size].copy_from_slice(&ringbuf[self.start..self.start + read_size]);
79        self.start = (self.start + read_size) % ringbuf_capacity;
80        self.buffer.len.fetch_sub(read_size, Ordering::SeqCst);
81
82        read_size
83    }
84}
85
86impl Read for SpscBufferReader {
87    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
88        Ok(self.read_to_slice(buf))
89    }
90}
91
92unsafe impl Sync for SpscBufferReader {}
93unsafe impl Send for SpscBufferReader {}
94
95/// Producer for the ringbuffer
96pub struct SpscBufferWriter {
97    end: usize,
98    buffer: Arc<SpscBuffer>,
99}
100
101impl SpscBufferWriter {
102    /// Get length of contents currently in the buffer
103    pub fn len(&self) -> usize {
104        self.buffer.len()
105    }
106
107    /// Get total capacity of the buffer
108    pub fn capacity(&self) -> usize {
109        self.buffer.capacity()
110    }
111
112    /// Check whether the buffer is currently empty
113    pub fn is_empty(&self) -> bool {
114        self.buffer.is_empty()
115    }
116
117    /// Check whether the buffer is currently empty
118    pub fn is_full(&self) -> bool {
119        self.buffer.is_full()
120    }
121
122    /// Write data to the buffer. Returns number of bytes written.
123    pub fn write_from_slice(&mut self, buf: &[u8]) -> usize {
124        use std::cmp::min;
125
126        let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
127
128        let ringbuf_capacity = ringbuf.len();
129        let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
130
131        // Max number of bytes we might read
132        let max_write_size = min(buf.len(), ringbuf_capacity - ringbuf_len);
133        let space_until_end = ringbuf_capacity - self.end;
134        let write_size = min(max_write_size, space_until_end);
135
136        ringbuf[self.end..self.end + write_size].copy_from_slice(&buf[..write_size]);
137        self.end = (self.end + write_size) % ringbuf_capacity;
138        self.buffer.len.fetch_add(write_size, Ordering::SeqCst);
139
140        write_size
141    }
142}
143
144unsafe impl Sync for SpscBufferWriter {}
145unsafe impl Send for SpscBufferWriter {}
146
147impl Write for SpscBufferWriter {
148    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
149        Ok(self.write_from_slice(buf))
150    }
151
152    fn flush(&mut self) -> io::Result<()> {
153        Ok(())
154    }
155}
156
157/// Create a new SPSC buffer pair.
158///
159/// The producer and consumer can safely be transferred between threads; the
160/// expected use case is that one thread will be writing and one will be reading.
161///
162/// The underlying buffer's size is synchronised using an atomic. The producer
163/// and consumer have methods to query the size and the capacity, which is
164/// guaranteed to be consistent between threads but may not be sufficient to
165/// prevent races depending on what you are trying to achieve.
166///
167/// See the mio-anonymous-pipes crate for example usage.
168pub fn spsc_buffer(size: usize) -> (SpscBufferWriter, SpscBufferReader) {
169    let buffer = Arc::new(SpscBuffer::new(size));
170
171    let producer = SpscBufferWriter {
172        end: 0,
173        buffer: buffer.clone(),
174    };
175    let consumer = SpscBufferReader { start: 0, buffer };
176
177    (producer, consumer)
178}
179
180#[cfg(test)]
181mod test {
182    use super::*;
183
184    #[test]
185    fn test_spsc_buffer() {
186        let buf = [1u8; 100];
187
188        let (mut producer, mut consumer) = spsc_buffer(60);
189
190        assert!(producer.is_empty());
191        assert!(consumer.is_empty());
192
193        assert_eq!(producer.len(), 0);
194        assert_eq!(consumer.len(), 0);
195
196        assert_eq!(producer.capacity(), 60);
197        assert_eq!(consumer.capacity(), 60);
198
199        let mut out_buf = [0u8; 100];
200
201        assert_eq!(producer.write_from_slice(&buf), 60);
202        assert_eq!(producer.len(), 60);
203        assert_eq!(consumer.len(), 60);
204
205        assert_eq!(consumer.read_to_slice(&mut out_buf), 60);
206        assert_eq!(producer.len(), 0);
207        assert_eq!(consumer.len(), 0);
208
209        assert_eq!(producer.write_from_slice(&buf[60..]), 40);
210        assert_eq!(producer.len(), 40);
211        assert_eq!(consumer.len(), 40);
212
213        assert_eq!(consumer.read_to_slice(&mut out_buf[60..]), 40);
214        assert_eq!(producer.len(), 0);
215        assert_eq!(consumer.len(), 0);
216
217        assert_eq!(&buf[..], &out_buf[..]);
218
219
220    }
221}