pipe/
lib.rs

1#![deny(missing_docs)]
2#![doc(html_root_url = "https://docs.rs/pipe/0.4.0")]
3#![cfg_attr(feature = "unstable-doc-cfg", feature(doc_cfg))]
4
5//! Synchronous in-memory pipe
6//!
7//! ## Example
8//!
9//! ```
10//! use std::thread::spawn;
11//! use std::io::{Read, Write};
12//!
13//! let (mut read, mut write) = pipe::pipe();
14//!
15//! let message = "Hello, world!";
16//! spawn(move || write.write_all(message.as_bytes()).unwrap());
17//!
18//! let mut s = String::new();
19//! read.read_to_string(&mut s).unwrap();
20//!
21//! assert_eq!(&s, message);
22//! ```
23
24#[cfg(feature="readwrite")]
25extern crate readwrite;
26extern crate crossbeam_channel;
27
28use crossbeam_channel::{Sender, Receiver, SendError, TrySendError};
29use std::io::{self, BufRead, Read, Write};
30use std::cmp::min;
31use std::mem::replace;
32use std::hint::unreachable_unchecked;
33
34// value for libstd
35const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37/// The `Read` end of a pipe (see `pipe()`)
38pub struct PipeReader {
39    receiver: Receiver<Vec<u8>>,
40    buffer: Vec<u8>,
41    position: usize,
42}
43
44/// The `Write` end of a pipe (see `pipe()`)
45#[derive(Clone)]
46pub struct PipeWriter {
47    sender: Sender<Vec<u8>>
48}
49
50/// The `Write` end of a pipe (see `pipe()`) that will buffer small writes before sending
51/// to the reader end.
52pub struct PipeBufWriter {
53    sender: Option<Sender<Vec<u8>>>,
54    buffer: Vec<u8>,
55    size: usize,
56}
57
58/// Creates a synchronous memory pipe
59pub fn pipe() -> (PipeReader, PipeWriter) {
60    let (sender, receiver) = crossbeam_channel::bounded(0);
61
62    (
63        PipeReader { receiver, buffer: Vec::new(), position: 0 },
64        PipeWriter { sender },
65    )
66}
67
68/// Creates a synchronous memory pipe with buffered writer
69pub fn pipe_buffered() -> (PipeReader, PipeBufWriter) {
70    let (tx, rx) = crossbeam_channel::bounded(0);
71
72    (PipeReader { receiver: rx, buffer: Vec::new(), position: 0 }, PipeBufWriter { sender: Some(tx), buffer: Vec::with_capacity(DEFAULT_BUF_SIZE), size: DEFAULT_BUF_SIZE } )
73}
74
75/// Creates a pair of pipes for bidirectional communication, a bit like UNIX's `socketpair(2)`.
76#[cfg(feature = "bidirectional")]
77#[cfg_attr(feature = "unstable-doc-cfg", doc(cfg(feature = "bidirectional")))]
78pub fn bipipe() -> (readwrite::ReadWrite<PipeReader, PipeWriter>, readwrite::ReadWrite<PipeReader, PipeWriter>) {
79    let (r1,w1) = pipe();
80    let (r2,w2) = pipe();
81    ((r1,w2).into(), (r2,w1).into())
82}
83
84/// Creates a pair of pipes for bidirectional communication using buffered writer, a bit like UNIX's `socketpair(2)`.
85#[cfg(feature = "bidirectional")]
86#[cfg_attr(feature = "unstable-doc-cfg", doc(cfg(feature = "bidirectional")))]
87pub fn bipipe_buffered() -> (readwrite::ReadWrite<PipeReader, PipeBufWriter>, readwrite::ReadWrite<PipeReader, PipeBufWriter>) {
88    let (r1,w1) = pipe_buffered();
89    let (r2,w2) = pipe_buffered();
90    ((r1,w2).into(), (r2,w1).into())
91}
92
93fn epipe() -> io::Error {
94    io::Error::new(io::ErrorKind::BrokenPipe, "pipe reader has been dropped")
95}
96
97impl PipeWriter {
98    /// Extracts the inner `Sender` from the writer
99    pub fn into_inner(self) -> Sender<Vec<u8>> {
100        self.sender
101    }
102
103    /// Gets a reference to the underlying `Sender`
104    pub fn sender(&self) -> &Sender<Vec<u8>> {
105        &self.sender
106    }
107
108    /// Write data to the associated `PipeReader`
109    pub fn send<B: Into<Vec<u8>>>(&self, bytes: B) -> io::Result<()> {
110        self.sender.send(bytes.into())
111            .map_err(|_| epipe())
112            .map(drop)
113    }
114}
115
116impl PipeBufWriter {
117    /// Extracts the inner `Sender` from the writer, and any pending buffered data
118    pub fn into_inner(mut self) -> (Sender<Vec<u8>>, Vec<u8>) {
119        let sender = match replace(&mut self.sender, None) {
120            Some(sender) => sender,
121            None => unsafe {
122                // SAFETY: this is safe as long as `into_inner()` is the only method
123                // that clears the sender
124                unreachable_unchecked()
125            },
126        };
127        (sender, replace(&mut self.buffer, Vec::new()))
128    }
129
130    #[inline]
131    /// Gets a reference to the underlying `Sender`
132    pub fn sender(&self) -> &Sender<Vec<u8>> {
133        match &self.sender {
134            Some(sender) => sender,
135            None => unsafe {
136                // SAFETY: this is safe as long as `into_inner()` is the only method
137                // that clears the sender, and this fn is never called afterward
138                unreachable_unchecked()
139            },
140        }
141    }
142
143    /// Returns a reference to the internally buffered data.
144    pub fn buffer(&self) -> &[u8] {
145        &self.buffer
146    }
147
148    /// Returns the number of bytes the internal buffer can hold without flushing.
149    pub fn capacity(&self) -> usize {
150        self.size
151    }
152}
153
154/// Creates a new handle to the `PipeBufWriter` with a fresh new buffer. Any pending data is still
155/// owned by the existing writer and should be flushed if necessary.
156impl Clone for PipeBufWriter {
157    fn clone(&self) -> Self {
158        Self {
159            sender: self.sender.clone(),
160            buffer: Vec::with_capacity(self.size),
161            size: self.size,
162        }
163    }
164}
165
166impl PipeReader {
167    /// Extracts the inner `Receiver` from the writer, and any pending buffered data
168    pub fn into_inner(mut self) -> (Receiver<Vec<u8>>, Vec<u8>) {
169        self.buffer.drain(..self.position);
170        (self.receiver, self.buffer)
171    }
172
173    /// Returns a reference to the internally buffered data.
174    pub fn buffer(&self) -> &[u8] {
175        &self.buffer[self.position..]
176    }
177}
178
179/// Creates a new handle to the `PipeReader` with a fresh new buffer. Any pending data is still
180/// owned by the existing reader and will not be accessible from the new handle.
181impl Clone for PipeReader {
182    fn clone(&self) -> Self {
183        Self {
184            receiver: self.receiver.clone(),
185            buffer: Vec::new(),
186            position: 0,
187        }
188    }
189}
190
191impl BufRead for PipeReader {
192    fn fill_buf(&mut self) -> io::Result<&[u8]> {
193        while self.position >= self.buffer.len() {
194            match self.receiver.recv() {
195                // The only existing error is EOF
196                Err(_) => break,
197                Ok(data) => {
198                    self.buffer = data;
199                    self.position = 0;
200                }
201            }
202        }
203
204        Ok(&self.buffer[self.position..])
205    }
206
207    fn consume(&mut self, amt: usize) {
208        debug_assert!(self.buffer.len() - self.position >= amt);
209        self.position += amt
210    }
211}
212
213impl Read for PipeReader {
214    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
215        if buf.is_empty() {
216            return Ok(0);
217        }
218
219        let internal = self.fill_buf()?;
220
221        let len = min(buf.len(), internal.len());
222        if len > 0 {
223            buf[..len].copy_from_slice(&internal[..len]);
224            self.consume(len);
225        }
226        Ok(len)
227    }
228}
229
230impl Write for &'_ PipeWriter {
231    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
232        let data = buf.to_vec();
233
234        self.send(data)
235            .map(|_| buf.len())
236    }
237
238    fn flush(&mut self) -> io::Result<()> {
239        Ok(())
240    }
241}
242
243impl Write for PipeWriter {
244    #[inline]
245    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
246        Write::write(&mut &*self, buf)
247    }
248
249    #[inline]
250    fn flush(&mut self) -> io::Result<()> {
251        Write::flush(&mut &*self)
252    }
253}
254
255impl Write for PipeBufWriter {
256    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
257        let buffer_len = self.buffer.len();
258        let bytes_written = if buf.len() > self.size {
259            // bypass buffering for big writes
260            buf.len()
261        } else {
262            // avoid resizing of the buffer
263            min(buf.len(), self.size - buffer_len)
264        };
265        self.buffer.extend_from_slice(&buf[..bytes_written]);
266
267        if self.buffer.len() >= self.size {
268            self.flush()?;
269        } else {
270            // reserve capacity later to avoid needless allocations
271            let data = replace(&mut self.buffer, Vec::new());
272
273            // buffer still has space but try to send it in case the other side already awaits
274            match self.sender().try_send(data) {
275                Ok(_) => self.buffer.reserve(self.size),
276                Err(TrySendError::Full(data)) =>
277                    self.buffer = data,
278                Err(TrySendError::Disconnected(data)) => {
279                    self.buffer = data;
280                    self.buffer.truncate(buffer_len);
281                    return Err(epipe())
282                },
283            }
284        }
285
286        Ok(bytes_written)
287    }
288
289    fn flush(&mut self) -> io::Result<()> {
290        if self.buffer.is_empty() {
291            Ok(())
292        } else {
293            let data = replace(&mut self.buffer, Vec::new());
294            match self.sender().send(data) {
295                Ok(_) => {
296                    self.buffer.reserve(self.size);
297                    Ok(())
298                },
299                Err(SendError(data)) => {
300                    self.buffer = data;
301                    Err(epipe())
302                },
303            }
304        }
305    }
306}
307
308/// Flushes the contents of the buffer before the writer is dropped. Errors are ignored, so it is
309/// recommended that `flush()` be used explicitly instead of relying on Drop.
310///
311/// This final flush can be avoided by using `drop(writer.into_inner())`.
312impl Drop for PipeBufWriter {
313    fn drop(&mut self) {
314        if !self.buffer.is_empty() {
315            let data = replace(&mut self.buffer, Vec::new());
316            let _ = self.sender().send(data);
317        }
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use std::thread::spawn;
324    use std::io::{Read, Write};
325    use super::*;
326
327    #[test]
328    fn pipe_reader() {
329        let i = b"hello there";
330        let mut o = Vec::with_capacity(i.len());
331        let (mut r, mut w) = pipe();
332        let guard = spawn(move || {
333            w.write_all(&i[..5]).unwrap();
334            w.write_all(&i[5..]).unwrap();
335            drop(w);
336        });
337
338        r.read_to_end(&mut o).unwrap();
339        assert_eq!(i, &o[..]);
340
341        guard.join().unwrap();
342    }
343
344    #[test]
345    fn pipe_writer_fail() {
346        let i = b"hi";
347        let (r, mut w) = pipe();
348        let guard = spawn(move || {
349            drop(r);
350        });
351
352        assert!(w.write_all(i).is_err());
353
354        guard.join().unwrap();
355    }
356
357    #[test]
358    fn small_reads() {
359        let block_cnt = 20;
360        const BLOCK: usize = 20;
361        let (mut r, mut w) = pipe();
362        let guard = spawn(move || {
363            for _ in 0..block_cnt {
364                let data = &[0; BLOCK];
365                w.write_all(data).unwrap();
366            }
367        });
368
369        let mut buff = [0; BLOCK / 2];
370        let mut read = 0;
371        while let Ok(size) = r.read(&mut buff) {
372            // 0 means EOF
373            if size == 0 {
374                break;
375            }
376            read += size;
377        }
378        assert_eq!(block_cnt * BLOCK, read);
379
380        guard.join().unwrap();
381    }
382
383    #[test]
384    fn pipe_reader_buffered() {
385        let i = b"hello there";
386        let mut o = Vec::with_capacity(i.len());
387        let (mut r, mut w) = pipe_buffered();
388        let guard = spawn(move || {
389            w.write_all(&i[..5]).unwrap();
390            w.write_all(&i[5..]).unwrap();
391            w.flush().unwrap();
392            drop(w);
393        });
394
395        r.read_to_end(&mut o).unwrap();
396        assert_eq!(i, &o[..]);
397
398        guard.join().unwrap();
399    }
400
401    #[test]
402    fn pipe_writer_fail_buffered() {
403        let i = &[0; DEFAULT_BUF_SIZE * 2];
404        let (r, mut w) = pipe_buffered();
405        let guard = spawn(move || {
406            drop(r);
407        });
408
409        assert!(w.write_all(i).is_err());
410
411        guard.join().unwrap();
412    }
413
414    #[test]
415    fn small_reads_buffered() {
416        let block_cnt = 20;
417        const BLOCK: usize = 20;
418        let (mut r, mut w) = pipe_buffered();
419        let guard = spawn(move || {
420            for _ in 0..block_cnt {
421                let data = &[0; BLOCK];
422                w.write_all(data).unwrap();
423            }
424            w.flush().unwrap();
425        });
426
427        let mut buff = [0; BLOCK / 2];
428        let mut read = 0;
429        while let Ok(size) = r.read(&mut buff) {
430            // 0 means EOF
431            if size == 0 {
432                break;
433            }
434            read += size;
435        }
436        assert_eq!(block_cnt * BLOCK, read);
437
438        guard.join().unwrap();
439    }
440}