1use std::cell::UnsafeCell;
2use std::io::{self, Read, Write};
3use std::mem;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6
7struct SpscBuffer {
8 buf: UnsafeCell<Box<[u8]>>,
9 len: AtomicUsize,
10}
11
12impl SpscBuffer {
13 fn new(size: usize) -> Self {
14 Self {
15 buf: UnsafeCell::new(vec![0; size].into_boxed_slice()),
16 len: AtomicUsize::new(0),
17 }
18 }
19
20 fn len(&self) -> usize {
21 self.len.load(Ordering::SeqCst)
22 }
23
24 fn capacity(&self) -> usize {
25 unsafe { &*self.buf.get() }.len()
26 }
27
28 fn is_empty(&self) -> bool {
29 self.len() == 0
30 }
31
32 fn is_full(&self) -> bool {
33 self.len() == self.capacity()
34 }
35}
36
37pub struct SpscBufferReader {
39 start: usize,
40 buffer: Arc<SpscBuffer>,
41}
42
43impl SpscBufferReader {
44 pub fn len(&self) -> usize {
46 self.buffer.len()
47 }
48
49 pub fn capacity(&self) -> usize {
51 self.buffer.capacity()
52 }
53
54 pub fn is_empty(&self) -> bool {
56 self.buffer.is_empty()
57 }
58
59 pub fn is_full(&self) -> bool {
61 self.buffer.is_full()
62 }
63
64 pub fn read_to_slice(&mut self, buf: &mut [u8]) -> usize {
66 use std::cmp::min;
67
68 let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
69
70 let ringbuf_capacity = ringbuf.len();
71 let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
72
73 let max_read_size = min(buf.len(), ringbuf_len);
75 let contents_until_end = ringbuf_capacity - self.start;
76 let read_size = min(max_read_size, contents_until_end);
77
78 buf[..read_size].copy_from_slice(&ringbuf[self.start..self.start + read_size]);
79 self.start = (self.start + read_size) % ringbuf_capacity;
80 self.buffer.len.fetch_sub(read_size, Ordering::SeqCst);
81
82 read_size
83 }
84}
85
86impl Read for SpscBufferReader {
87 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
88 Ok(self.read_to_slice(buf))
89 }
90}
91
92unsafe impl Sync for SpscBufferReader {}
93unsafe impl Send for SpscBufferReader {}
94
95pub struct SpscBufferWriter {
97 end: usize,
98 buffer: Arc<SpscBuffer>,
99}
100
101impl SpscBufferWriter {
102 pub fn len(&self) -> usize {
104 self.buffer.len()
105 }
106
107 pub fn capacity(&self) -> usize {
109 self.buffer.capacity()
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.buffer.is_empty()
115 }
116
117 pub fn is_full(&self) -> bool {
119 self.buffer.is_full()
120 }
121
122 pub fn write_from_slice(&mut self, buf: &[u8]) -> usize {
124 use std::cmp::min;
125
126 let ringbuf: &mut Box<[u8]> = unsafe { mem::transmute(self.buffer.buf.get()) };
127
128 let ringbuf_capacity = ringbuf.len();
129 let ringbuf_len = self.buffer.len.load(Ordering::SeqCst);
130
131 let max_write_size = min(buf.len(), ringbuf_capacity - ringbuf_len);
133 let space_until_end = ringbuf_capacity - self.end;
134 let write_size = min(max_write_size, space_until_end);
135
136 ringbuf[self.end..self.end + write_size].copy_from_slice(&buf[..write_size]);
137 self.end = (self.end + write_size) % ringbuf_capacity;
138 self.buffer.len.fetch_add(write_size, Ordering::SeqCst);
139
140 write_size
141 }
142}
143
144unsafe impl Sync for SpscBufferWriter {}
145unsafe impl Send for SpscBufferWriter {}
146
147impl Write for SpscBufferWriter {
148 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
149 Ok(self.write_from_slice(buf))
150 }
151
152 fn flush(&mut self) -> io::Result<()> {
153 Ok(())
154 }
155}
156
157pub fn spsc_buffer(size: usize) -> (SpscBufferWriter, SpscBufferReader) {
169 let buffer = Arc::new(SpscBuffer::new(size));
170
171 let producer = SpscBufferWriter {
172 end: 0,
173 buffer: buffer.clone(),
174 };
175 let consumer = SpscBufferReader { start: 0, buffer };
176
177 (producer, consumer)
178}
179
180#[cfg(test)]
181mod test {
182 use super::*;
183
184 #[test]
185 fn test_spsc_buffer() {
186 let buf = [1u8; 100];
187
188 let (mut producer, mut consumer) = spsc_buffer(60);
189
190 assert!(producer.is_empty());
191 assert!(consumer.is_empty());
192
193 assert_eq!(producer.len(), 0);
194 assert_eq!(consumer.len(), 0);
195
196 assert_eq!(producer.capacity(), 60);
197 assert_eq!(consumer.capacity(), 60);
198
199 let mut out_buf = [0u8; 100];
200
201 assert_eq!(producer.write_from_slice(&buf), 60);
202 assert_eq!(producer.len(), 60);
203 assert_eq!(consumer.len(), 60);
204
205 assert_eq!(consumer.read_to_slice(&mut out_buf), 60);
206 assert_eq!(producer.len(), 0);
207 assert_eq!(consumer.len(), 0);
208
209 assert_eq!(producer.write_from_slice(&buf[60..]), 40);
210 assert_eq!(producer.len(), 40);
211 assert_eq!(consumer.len(), 40);
212
213 assert_eq!(consumer.read_to_slice(&mut out_buf[60..]), 40);
214 assert_eq!(producer.len(), 0);
215 assert_eq!(consumer.len(), 0);
216
217 assert_eq!(&buf[..], &out_buf[..]);
218
219
220 }
221}