1#![deny(missing_docs)]
2#![doc(html_root_url = "https://docs.rs/pipe/0.4.0")]
3#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))]
4
5#[cfg(feature="readwrite")]
25extern crate readwrite;
26extern crate crossbeam_channel;
27
28use crossbeam_channel::{Sender, Receiver, SendError, TrySendError};
29use std::io::{self, BufRead, Read, Write};
30use std::cmp::min;
31use std::mem::replace;
32use std::hint::unreachable_unchecked;
33
34const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37pub struct PipeReader {
39 receiver: Receiver<Vec<u8>>,
40 buffer: Vec<u8>,
41 position: usize,
42}
43
44#[derive(Clone)]
46pub struct PipeWriter {
47 sender: Sender<Vec<u8>>
48}
49
50pub struct PipeBufWriter {
53 sender: Option<Sender<Vec<u8>>>,
54 buffer: Vec<u8>,
55 size: usize,
56}
57
58pub fn pipe() -> (PipeReader, PipeWriter) {
60 let (sender, receiver) = crossbeam_channel::bounded(0);
61
62 (
63 PipeReader { receiver, buffer: Vec::new(), position: 0 },
64 PipeWriter { sender },
65 )
66}
67
68pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) {
70 let (tx, rx) = crossbeam_channel::bounded(0);
71
72 (PipeReader { receiver: rx, buffer: Vec::new(), position: 0 }, PipeBufWriter { sender: Some(tx), buffer: Vec::with_capacity(DEFAULT_BUF_SIZE), size: DEFAULT_BUF_SIZE } )
73}
74
75#[cfg(feature = "bidirectional")]
77#[cfg_attr(feature = "unstable-doc-cfg", doc(cfg(feature = "bidirectional")))]
78pub fn bipipe() -> (readwrite::ReadWrite<PipeReader, PipeWriter>, readwrite::ReadWrite<PipeReader, PipeWriter>) {
79 let (r1,w1) = pipe();
80 let (r2,w2) = pipe();
81 ((r1,w2).into(), (r2,w1).into())
82}
83
84#[cfg(feature = "bidirectional")]
86#[cfg_attr(feature = "unstable-doc-cfg", doc(cfg(feature = "bidirectional")))]
87pub fn bipipe_buffered() -> (readwrite::ReadWrite<PipeReader, PipeBufWriter>, readwrite::ReadWrite<PipeReader, PipeBufWriter>) {
88 let (r1,w1) = pipe_buffered();
89 let (r2,w2) = pipe_buffered();
90 ((r1,w2).into(), (r2,w1).into())
91}
92
93fn epipe() -> io::Error {
94 io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")
95}
96
97impl PipeWriter {
98 pub fn into_inner(self) -> Sender<Vec<u8>> {
100 self.sender
101 }
102
103 pub fn sender(&self) -> &Sender<Vec<u8>> {
105 &self.sender
106 }
107
108 pub fn send<B: Into<Vec<u8>>>(&self, bytes: B) -> io::Result<()> {
110 self.sender.send(bytes.into())
111 .map_err(|_| epipe())
112 .map(drop)
113 }
114}
115
116impl PipeBufWriter {
117 pub fn into_inner(mut self) -> (Sender<Vec<u8>>, Vec<u8>) {
119 let sender = match replace(&mut self.sender, None) {
120 Some(sender) => sender,
121 None => unsafe {
122 unreachable_unchecked()
125 },
126 };
127 (sender, replace(&mut self.buffer, Vec::new()))
128 }
129
130 #[inline]
131 pub fn sender(&self) -> &Sender<Vec<u8>> {
133 match &self.sender {
134 Some(sender) => sender,
135 None => unsafe {
136 unreachable_unchecked()
139 },
140 }
141 }
142
143 pub fn buffer(&self) -> &[u8] {
145 &self.buffer
146 }
147
148 pub fn capacity(&self) -> usize {
150 self.size
151 }
152}
153
154impl Clone for PipeBufWriter {
157 fn clone(&self) -> Self {
158 Self {
159 sender: self.sender.clone(),
160 buffer: Vec::with_capacity(self.size),
161 size: self.size,
162 }
163 }
164}
165
166impl PipeReader {
167 pub fn into_inner(mut self) -> (Receiver<Vec<u8>>, Vec<u8>) {
169 self.buffer.drain(..self.position);
170 (self.receiver, self.buffer)
171 }
172
173 pub fn buffer(&self) -> &[u8] {
175 &self.buffer[self.position..]
176 }
177}
178
179impl Clone for PipeReader {
182 fn clone(&self) -> Self {
183 Self {
184 receiver: self.receiver.clone(),
185 buffer: Vec::new(),
186 position: 0,
187 }
188 }
189}
190
191impl BufRead for PipeReader {
192 fn fill_buf(&mut self) -> io::Result<&[u8]> {
193 while self.position >= self.buffer.len() {
194 match self.receiver.recv() {
195 Err(_) => break,
197 Ok(data) => {
198 self.buffer = data;
199 self.position = 0;
200 }
201 }
202 }
203
204 Ok(&self.buffer[self.position..])
205 }
206
207 fn consume(&mut self, amt: usize) {
208 debug_assert!(self.buffer.len() - self.position >= amt);
209 self.position += amt
210 }
211}
212
213impl Read for PipeReader {
214 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
215 if buf.is_empty() {
216 return Ok(0);
217 }
218
219 let internal = self.fill_buf()?;
220
221 let len = min(buf.len(), internal.len());
222 if len > 0 {
223 buf[..len].copy_from_slice(&internal[..len]);
224 self.consume(len);
225 }
226 Ok(len)
227 }
228}
229
230impl Write for &'_ PipeWriter {
231 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
232 let data = buf.to_vec();
233
234 self.send(data)
235 .map(|_| buf.len())
236 }
237
238 fn flush(&mut self) -> io::Result<()> {
239 Ok(())
240 }
241}
242
243impl Write for PipeWriter {
244 #[inline]
245 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
246 Write::write(&mut &*self, buf)
247 }
248
249 #[inline]
250 fn flush(&mut self) -> io::Result<()> {
251 Write::flush(&mut &*self)
252 }
253}
254
255impl Write for PipeBufWriter {
256 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
257 let buffer_len = self.buffer.len();
258 let bytes_written = if buf.len() > self.size {
259 buf.len()
261 } else {
262 min(buf.len(), self.size - buffer_len)
264 };
265 self.buffer.extend_from_slice(&buf[..bytes_written]);
266
267 if self.buffer.len() >= self.size {
268 self.flush()?;
269 } else {
270 let data = replace(&mut self.buffer, Vec::new());
272
273 match self.sender().try_send(data) {
275 Ok(_) => self.buffer.reserve(self.size),
276 Err(TrySendError::Full(data)) =>
277 self.buffer = data,
278 Err(TrySendError::Disconnected(data)) => {
279 self.buffer = data;
280 self.buffer.truncate(buffer_len);
281 return Err(epipe())
282 },
283 }
284 }
285
286 Ok(bytes_written)
287 }
288
289 fn flush(&mut self) -> io::Result<()> {
290 if self.buffer.is_empty() {
291 Ok(())
292 } else {
293 let data = replace(&mut self.buffer, Vec::new());
294 match self.sender().send(data) {
295 Ok(_) => {
296 self.buffer.reserve(self.size);
297 Ok(())
298 },
299 Err(SendError(data)) => {
300 self.buffer = data;
301 Err(epipe())
302 },
303 }
304 }
305 }
306}
307
308impl Drop for PipeBufWriter {
313 fn drop(&mut self) {
314 if !self.buffer.is_empty() {
315 let data = replace(&mut self.buffer, Vec::new());
316 let _ = self.sender().send(data);
317 }
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use std::thread::spawn;
324 use std::io::{Read, Write};
325 use super::*;
326
327 #[test]
328 fn pipe_reader() {
329 let i = b"hello there";
330 let mut o = Vec::with_capacity(i.len());
331 let (mut r, mut w) = pipe();
332 let guard = spawn(move || {
333 w.write_all(&i[..5]).unwrap();
334 w.write_all(&i[5..]).unwrap();
335 drop(w);
336 });
337
338 r.read_to_end(&mut o).unwrap();
339 assert_eq!(i, &o[..]);
340
341 guard.join().unwrap();
342 }
343
344 #[test]
345 fn pipe_writer_fail() {
346 let i = b"hi";
347 let (r, mut w) = pipe();
348 let guard = spawn(move || {
349 drop(r);
350 });
351
352 assert!(w.write_all(i).is_err());
353
354 guard.join().unwrap();
355 }
356
357 #[test]
358 fn small_reads() {
359 let block_cnt = 20;
360 const BLOCK: usize = 20;
361 let (mut r, mut w) = pipe();
362 let guard = spawn(move || {
363 for _ in 0..block_cnt {
364 let data = &[0; BLOCK];
365 w.write_all(data).unwrap();
366 }
367 });
368
369 let mut buff = [0; BLOCK / 2];
370 let mut read = 0;
371 while let Ok(size) = r.read(&mut buff) {
372 if size == 0 {
374 break;
375 }
376 read += size;
377 }
378 assert_eq!(block_cnt * BLOCK, read);
379
380 guard.join().unwrap();
381 }
382
383 #[test]
384 fn pipe_reader_buffered() {
385 let i = b"hello there";
386 let mut o = Vec::with_capacity(i.len());
387 let (mut r, mut w) = pipe_buffered();
388 let guard = spawn(move || {
389 w.write_all(&i[..5]).unwrap();
390 w.write_all(&i[5..]).unwrap();
391 w.flush().unwrap();
392 drop(w);
393 });
394
395 r.read_to_end(&mut o).unwrap();
396 assert_eq!(i, &o[..]);
397
398 guard.join().unwrap();
399 }
400
401 #[test]
402 fn pipe_writer_fail_buffered() {
403 let i = &[0; DEFAULT_BUF_SIZE * 2];
404 let (r, mut w) = pipe_buffered();
405 let guard = spawn(move || {
406 drop(r);
407 });
408
409 assert!(w.write_all(i).is_err());
410
411 guard.join().unwrap();
412 }
413
414 #[test]
415 fn small_reads_buffered() {
416 let block_cnt = 20;
417 const BLOCK: usize = 20;
418 let (mut r, mut w) = pipe_buffered();
419 let guard = spawn(move || {
420 for _ in 0..block_cnt {
421 let data = &[0; BLOCK];
422 w.write_all(data).unwrap();
423 }
424 w.flush().unwrap();
425 });
426
427 let mut buff = [0; BLOCK / 2];
428 let mut read = 0;
429 while let Ok(size) = r.read(&mut buff) {
430 if size == 0 {
432 break;
433 }
434 read += size;
435 }
436 assert_eq!(block_cnt * BLOCK, read);
437
438 guard.join().unwrap();
439 }
440}