1use std::collections::VecDeque;
2use std::fmt::Arguments;
3use std::io::{BufRead, IoSlice};
4use std::io::{Error, ErrorKind, Read, Result as IOResult, Write};
5
6use loole::{Receiver, Sender, unbounded};
7
8use crate::state::SharedState;
9
10pub fn pipe() -> (Writer, Reader) {
32 let (sender, receiver) = unbounded();
33
34 let state = SharedState::default();
35
36 (
37 Writer {
38 sender,
39 state: state.clone(),
40 },
41 Reader {
42 receiver,
43 state,
44 buf: VecDeque::new(),
45 },
46 )
47}
48
49#[derive(Clone, Debug)]
70pub struct Writer {
71 pub(crate) sender: Sender<()>,
72 pub(crate) state: SharedState,
73}
74
75impl Write for Writer {
76 fn write(&mut self, buf: &[u8]) -> IOResult<usize> {
77 let n = self.state.write(buf)?;
78 match self.sender.send(()) {
79 Ok(_) => Ok(n),
80 Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
81 }
82 }
83
84 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
85 let n = self.state.write_vectored(bufs)?;
86 match self.sender.send(()) {
87 Ok(_) => Ok(n),
88 Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
89 }
90 }
91
92 fn flush(&mut self) -> IOResult<()> {
93 self.state.flush()
94 }
95
96 fn write_all(&mut self, buf: &[u8]) -> IOResult<()> {
97 self.state.write_all(buf)?;
98 match self.sender.send(()) {
99 Ok(_) => Ok(()),
100 Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
101 }
102 }
103
104 fn write_fmt(&mut self, fmt: Arguments<'_>) -> IOResult<()> {
105 self.state.write_fmt(fmt)?;
106 match self.sender.send(()) {
107 Ok(_) => Ok(()),
108 Err(e) => Err(Error::new(ErrorKind::WriteZero, e)),
109 }
110 }
111}
112
113#[derive(Debug)]
137pub struct Reader {
138 pub(crate) receiver: Receiver<()>,
139 pub(crate) buf: VecDeque<u8>,
140 pub(crate) state: SharedState,
141}
142
143impl BufRead for Reader {
144 fn fill_buf(&mut self) -> IOResult<&[u8]> {
145 while self.buf.is_empty() {
146 let n = self.state.copy_to(&mut self.buf)?;
147 if n == 0 && self.receiver.recv().is_err() {
148 break;
149 }
150 }
151
152 self.buf.fill_buf()
153 }
154
155 fn consume(&mut self, amt: usize) {
156 self.buf.consume(amt)
157 }
158}
159
160impl Read for Reader {
161 fn read(&mut self, mut buf: &mut [u8]) -> IOResult<usize> {
162 let n = buf.write(self.fill_buf()?)?;
163 self.consume(n);
164 Ok(n)
165 }
166}
167#[cfg(test)]
168mod tests {
169 use std::io::{BufRead, IoSlice, Read, Write, read_to_string};
170 use std::thread::spawn;
171
172 #[test]
173 fn base_write_case() {
174 let (mut writer, reader) = crate::pipe();
176 for _ in 0..1000 {
177 writer.write_all("hello".as_bytes()).unwrap();
178 }
179 drop(reader)
180 }
181
182 #[test]
183 fn base_read_case() {
184 let (mut writer, reader) = crate::pipe();
185 writer.write_all("hello ".as_bytes()).unwrap();
186 writer.write_all("world".as_bytes()).unwrap();
187 drop(writer);
188
189 assert_eq!("hello world".to_string(), read_to_string(reader).unwrap());
190 }
191 #[test]
192 fn base_vectored_case() {
193 let (mut writer, reader) = crate::pipe();
194 _ = writer
195 .write_vectored(&[
196 IoSlice::new("hello ".as_bytes()),
197 IoSlice::new("world".as_bytes()),
198 ])
199 .unwrap();
200 drop(writer);
201
202 assert_eq!("hello world".to_string(), read_to_string(reader).unwrap());
203 }
204
205 #[test]
206 fn thread_case() {
207 let (writer, reader) = crate::pipe();
208 for _ in 0..1000 {
209 let mut writer = writer.clone();
210 spawn(move || {
211 writer.write_all("hello".as_bytes()).unwrap();
212 });
213 }
214 drop(writer);
215
216 assert_eq!("hello".repeat(1000), read_to_string(reader).unwrap());
217 }
218
219 #[test]
220 fn writer_err_case() {
221 let (mut writer, reader) = crate::pipe();
222 drop(reader);
223
224 assert!(writer.write("hello".as_bytes()).is_err());
225 }
226
227 #[test]
228 fn bufread_case() {
229 let (mut writer, mut reader) = crate::pipe();
230 writer.write_all("hello\n".as_bytes()).unwrap();
231 writer.write_all("world".as_bytes()).unwrap();
232 drop(writer);
233
234 let mut str = String::new();
235 assert_ne!(0, reader.read_line(&mut str).unwrap());
236 assert_eq!("hello\n".to_string(), str);
237
238 let mut str = String::new();
239 assert_ne!(0, reader.read_line(&mut str).unwrap());
240 assert_eq!("world".to_string(), str);
241
242 let mut str = String::new();
243 assert_eq!(0, reader.read_line(&mut str).unwrap());
244 }
245
246 #[test]
247 fn bufread_lines_case() {
248 let (mut writer, reader) = crate::pipe();
249 writer.write_all("hello\n".as_bytes()).unwrap();
250 writer.write_all("world".as_bytes()).unwrap();
251 drop(writer);
252
253 assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count())
254 }
255
256 #[test]
257 fn threads_write_and_read_case() {
258 let (writer, mut reader) = crate::pipe();
259
260 for _ in 0..1000 {
261 let mut writer = writer.clone();
262 spawn(move || {
263 writer.write_all(&[0; 4]).unwrap();
264 });
265
266 let mut buf = [0; 4];
267 assert_eq!(buf.len(), reader.read(&mut buf).unwrap());
268 }
269 drop(writer);
270 }
271}