tokio_stdin_stdout/
lib.rs

1#![deny(missing_docs)]
2
3//! A bridge between [std::io::std{in,out}][1] and Future's AsyncRead/AsyncWrite worlds.
4//! [1]:https://doc.rust-lang.org/std/io/fn.stdin.html
5//!
6//! Example:
7//!
8//! ```rust,no_run
9//! extern crate tokio_core;
10//! extern crate tokio_io;
11//! extern crate tokio_stdin_stdout;
12//!
13//! let mut core = tokio_core::reactor::Core::new().unwrap();
14//!
15//! let stdin = tokio_stdin_stdout::stdin(0);
16//! let stdout = tokio_stdin_stdout::stdout(0);
17//!
18//! core.run(tokio_io::io::copy(stdin, stdout)).unwrap();
19//! ```
20//!
21//! It works by starting separate threads, which do actual synchronous I/O and communicating to
22//! the asynchronous world using [future::sync::mpsc](http://alexcrichton.com/futures-rs/futures/sync/mpsc/index.html).
23//!
24//! For Unix (Linux, OS X) better use [tokio-file-unix](https://crates.io/crates/tokio-file-unix).
25//!
26//! Concerns:
27//!
28//! * stdin/stdout are not expected to be ever normally used after using functions from this crate
29//! * Allocation-heavy.
30//! * All errors collapsed to ErrorKind::Other (for stdout) or ErrorKind::BrokenPipe (for stdin)
31//! * Failure to write to stdout is only seen after attempting to send there about 3 more buffers.
32
33extern crate futures;
34extern crate tokio_io;
35
36const BUFSIZ: usize = 8192;
37
38use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
39use std::cell::RefCell;
40use std::io::{Error, ErrorKind, Read, Result, Write};
41use std::rc::Rc;
42use std::sync::{Arc, LockResult, Mutex, MutexGuard, PoisonError, TryLockError, TryLockResult};
43use std::thread::JoinHandle;
44use tokio_io::{AsyncRead, AsyncWrite};
45
46type BBR = futures::sync::mpsc::Receiver<Box<[u8]>>;
47type BBS = futures::sync::mpsc::Sender<Box<[u8]>>;
48
49/// Asynchronous stdin
50pub struct ThreadedStdin {
51    debt: Option<Box<[u8]>>,
52    rcv: BBR,
53}
54
55impl ThreadedStdin {
56    /// Wrap into `Arc<Mutex>` to make it clonable and sendable
57    pub fn make_sendable(self) -> SendableStdin {
58        SendableStdin::new(self)
59    }
60    /// Wrap into `Rc<RefCell>` to make it clonable
61    pub fn make_clonable(self) -> ClonableStdin {
62        ClonableStdin::new(self)
63    }
64}
65
66/// Constructor for the `ThreadedStdin`
67pub fn stdin(queue_size: usize) -> ThreadedStdin {
68    let (snd_, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
69    std::thread::spawn(move || {
70        let mut snd = snd_;
71        let sin = ::std::io::stdin();
72        let mut sin_lock = sin.lock();
73        let mut buf = vec![0; BUFSIZ];
74        while let Ok(ret) = sin_lock.read(&mut buf[..]) {
75            let content = buf[0..ret].to_vec().into_boxed_slice();
76            snd = match snd.send(content).wait() {
77                Ok(x) => x,
78                Err(_) => break,
79            }
80        }
81    });
82    ThreadedStdin { debt: None, rcv }
83}
84
85impl AsyncRead for ThreadedStdin {}
86impl Read for ThreadedStdin {
87    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
88        let mut handle_the_buffer = |incoming_buf: Box<[u8]>| {
89            let l = buf.len();
90            let dl = incoming_buf.len();
91            if l >= dl {
92                buf[0..dl].copy_from_slice(&incoming_buf);
93                (None, Ok(dl))
94            } else {
95                buf[0..l].copy_from_slice(&incoming_buf[0..l]);
96                let newdebt = Some(incoming_buf[l..].to_vec().into_boxed_slice());
97                (newdebt, Ok(l))
98            }
99        };
100
101        let (new_debt, ret) = if let Some(debt) = self.debt.take() {
102            handle_the_buffer(debt)
103        } else {
104            match self.rcv.poll() {
105                Ok(Async::Ready(Some(newbuf))) => handle_the_buffer(newbuf),
106                Ok(Async::Ready(None)) => (None, Err(ErrorKind::BrokenPipe.into())),
107                Ok(Async::NotReady) => (None, Err(ErrorKind::WouldBlock.into())),
108                Err(_) => (None, Err(ErrorKind::Other.into())),
109            }
110        };
111        self.debt = new_debt;
112        ret
113    }
114}
115
116/// Asynchronous stdout
117pub struct ThreadedStdout {
118    snd: BBS,
119    jh: Option<JoinHandle<()>>,
120}
121
122impl ThreadedStdout {
123    /// Wrap into `Arc<Mutex>` to make it clonable and sendable
124    pub fn make_sendable(self) -> SendableStdout {
125        SendableStdout::new(self)
126    }
127    /// Wrap into `Rc<RefCell>` to make it clonable
128    pub fn make_clonable(self) -> ClonableStdout {
129        ClonableStdout::new(self)
130    }
131}
132/// Constructor for the `ThreadedStdout`
133pub fn stdout(queue_size: usize) -> ThreadedStdout {
134    let (snd, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
135    let jh = std::thread::spawn(move || {
136        let sout = ::std::io::stdout();
137        let mut sout_lock = sout.lock();
138        for b in rcv.wait() {
139            if let Ok(b) = b {
140                if b.len() == 0 {
141                    break;
142                }
143                if sout_lock.write_all(&b).is_err() {
144                    break;
145                }
146                if sout_lock.flush().is_err() {
147                    break;
148                }
149            } else {
150                break;
151            }
152        }
153        let _ = sout_lock.write(&[]);
154    });
155    ThreadedStdout { snd, jh: Some(jh) }
156}
157
158impl AsyncWrite for ThreadedStdout {
159    fn shutdown(&mut self) -> Poll<(), Error> {
160        // Signal the thread to exit.
161        match self.snd.start_send(vec![].into_boxed_slice()) {
162            Ok(AsyncSink::Ready) => (),
163            Ok(AsyncSink::NotReady(_)) => return Ok(Async::NotReady),
164            Err(_) => {
165                return Ok(Async::Ready(()));
166            }
167        };
168        match self.snd.poll_complete() {
169            Ok(Async::Ready(_)) => (),
170            Ok(Async::NotReady) => return Ok(Async::NotReady),
171            Err(e) => {
172                return Ok(Async::Ready(()));
173            }
174        };
175        if self.snd.close().is_err() {
176            return Ok(Async::Ready(()));
177        };
178        if let Some(jh) = self.jh.take() {
179            if jh.join().is_err() {
180                return Err(ErrorKind::Other.into());
181            };
182        }
183        Ok(Async::Ready(()))
184    }
185}
186impl Write for ThreadedStdout {
187    fn write(&mut self, buf: &[u8]) -> Result<usize> {
188        if buf.is_empty() {
189            return Ok(0);
190        }
191
192        match self.snd.start_send(buf.to_vec().into_boxed_slice()) {
193            Ok(AsyncSink::Ready) => (),
194            Ok(AsyncSink::NotReady(_)) => return Err(ErrorKind::WouldBlock.into()),
195            Err(_) => return Err(ErrorKind::Other.into()),
196        }
197
198        Ok(buf.len())
199    }
200    fn flush(&mut self) -> Result<()> {
201        match self.snd.poll_complete() {
202            Ok(Async::Ready(_)) => Ok(()),
203            Ok(Async::NotReady) => Err(ErrorKind::WouldBlock.into()),
204            Err(_) => Err(ErrorKind::Other.into()),
205        }
206    }
207}
208
209// XXX code duplication:
210
211/// Asynchronous stderr
212pub type ThreadedStderr = ThreadedStdout;
213/// Constructor for the `ThreadedStderr`
214pub fn stderr(queue_size: usize) -> ThreadedStderr {
215    let (snd, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
216    let jh = std::thread::spawn(move || {
217        let sout = ::std::io::stderr();
218        let mut sout_lock = sout.lock();
219        for b in rcv.wait() {
220            if let Ok(b) = b {
221                if b.len() == 0 {
222                    break;
223                }
224                if sout_lock.write_all(&b).is_err() {
225                    break;
226                }
227                if sout_lock.flush().is_err() {
228                    break;
229                }
230            } else {
231                break;
232            }
233        }
234        let _ = sout_lock.write(&[]);
235    });
236    ThreadedStdout { snd, jh: Some(jh) }
237}
238
239/// A sendable and clonable ThreadedStdout wrapper based on `Arc<Mutex<ThreadedStdout>>`
240///
241/// Note that a mutex is being locked every time a write is performed,
242/// so performance may be a problem, unless you use the `lock` method.
243///
244/// Also note that if data is outputted using multiple `Write::write` calls from multiple tasks, the order of chunks is not specified.
245#[derive(Clone)]
246pub struct SendableStdout(Arc<Mutex<ThreadedStdout>>);
247
248/// Result of `SendableStdout::lock` or `SendableStdout::try_lock`
249pub struct SendableStdoutGuard<'a>(MutexGuard<'a, ThreadedStdout>);
250
251impl SendableStdout {
252    /// wrap ThreadedStdout or ThreadedStderr in a sendable/clonable wrapper
253    pub fn new(so: ThreadedStdout) -> SendableStdout {
254        SendableStdout(Arc::new(Mutex::new(so)))
255    }
256
257    /// Acquire more permanent mutex guard on stdout, like with `std::io::Stdout::lock`
258    /// The returned guard also implements AsyncWrite
259    pub fn lock(&self) -> LockResult<SendableStdoutGuard> {
260        match self.0.lock() {
261            Ok(x) => Ok(SendableStdoutGuard(x)),
262            Err(e) => Err(PoisonError::new(SendableStdoutGuard(e.into_inner()))),
263        }
264    }
265    /// Acquire more permanent mutex guard on stdout
266    /// The returned guard also implements AsyncWrite
267    pub fn try_lock(&self) -> TryLockResult<SendableStdoutGuard> {
268        match self.0.try_lock() {
269            Ok(x) => Ok(SendableStdoutGuard(x)),
270            Err(TryLockError::Poisoned(e)) => Err(TryLockError::Poisoned(PoisonError::new(
271                SendableStdoutGuard(e.into_inner()),
272            ))),
273            Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
274        }
275    }
276}
277
278impl Write for SendableStdout {
279    fn write(&mut self, buf: &[u8]) -> Result<usize> {
280        match self.0.lock() {
281            Ok(mut l) => l.write(buf),
282            Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
283        }
284    }
285    fn flush(&mut self) -> Result<()> {
286        match self.0.lock() {
287            Ok(mut l) => l.flush(),
288            Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
289        }
290    }
291}
292impl AsyncWrite for SendableStdout {
293    fn shutdown(&mut self) -> Poll<(), Error> {
294        match self.0.lock() {
295            Ok(mut l) => l.shutdown(),
296            Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
297        }
298    }
299}
300impl<'a> Write for SendableStdoutGuard<'a> {
301    fn write(&mut self, buf: &[u8]) -> Result<usize> {
302        self.0.write(buf)
303    }
304    fn flush(&mut self) -> Result<()> {
305        self.0.flush()
306    }
307}
308impl<'a> AsyncWrite for SendableStdoutGuard<'a> {
309    fn shutdown(&mut self) -> Poll<(), Error> {
310        self.0.shutdown()
311    }
312}
313
314/// A clonable `ThreadedStdout` wrapper based on `Rc<RefCell<ThreadedStdout>>`
315/// If you need `Send`, use SendableStdout
316#[derive(Clone)]
317pub struct ClonableStdout(Rc<RefCell<ThreadedStdout>>);
318impl ClonableStdout {
319    /// wrap ThreadedStdout or ThreadedStderr in a sendable/clonable wrapper
320    pub fn new(so: ThreadedStdout) -> ClonableStdout {
321        ClonableStdout(Rc::new(RefCell::new(so)))
322    }
323}
324
325impl Write for ClonableStdout {
326    fn write(&mut self, buf: &[u8]) -> Result<usize> {
327        self.0.borrow_mut().write(buf)
328    }
329    fn flush(&mut self) -> Result<()> {
330        self.0.borrow_mut().flush()
331    }
332}
333impl AsyncWrite for ClonableStdout {
334    fn shutdown(&mut self) -> Poll<(), Error> {
335        self.0.borrow_mut().shutdown()
336    }
337}
338
339/// Alias for SendableStdout to avoid confusion of SendableStdout being used for stderr.
340///
341/// Note that a mutex is being locked every time a read is performed,
342/// so performance may be a problem.
343///
344/// Also note that if data is outputted using multiple `Write::write` calls from multiple tasks, the order of chunks is not specified.
345pub type SendableStderr = SendableStdout;
346/// Result of `SendableStderr::lock` or `SendableStderr::try_lock`
347pub type SendableStderrGuard<'a> = SendableStdoutGuard<'a>;
348/// Alias for ClonableStdout to avoid confusion of ClonableStdout being used for stderr.
349pub type ClonableStderr = ClonableStdout;
350
351/// A clonable `ThreadedStdout` wrapper based on `Rc<RefCell<ThreadedStdout>>`
352/// If you need `Send`, use SendableStdout
353///
354/// Note that data being read is not duplicated across cloned readers used from multiple tasks.
355/// Be careful about corruption.
356#[derive(Clone)]
357pub struct ClonableStdin(Rc<RefCell<ThreadedStdin>>);
358impl ClonableStdin {
359    /// wrap ThreadedStdout or ThreadedStderr in a sendable/clonable wrapper
360    pub fn new(so: ThreadedStdin) -> ClonableStdin {
361        ClonableStdin(Rc::new(RefCell::new(so)))
362    }
363}
364
365impl Read for ClonableStdin {
366    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
367        self.0.borrow_mut().read(buf)
368    }
369}
370impl AsyncRead for ClonableStdin {}
371/// A sendable and clonable ThreadedStdin wrapper based on `Arc<Mutex<ThreadedStdin>>`
372///
373/// Note that data being read is not duplicated across cloned readers used from multiple tasks.
374/// Be careful about corruption.
375#[derive(Clone)]
376pub struct SendableStdin(Arc<Mutex<ThreadedStdin>>);
377
378/// Result of `SendableStdin::lock` or `SendableStdin::try_lock`
379pub struct SendableStdinGuard<'a>(MutexGuard<'a, ThreadedStdin>);
380
381impl SendableStdin {
382    /// wrap ThreadedStdin in a sendable/clonable wrapper
383    pub fn new(si: ThreadedStdin) -> SendableStdin {
384        SendableStdin(Arc::new(Mutex::new(si)))
385    }
386
387    /// Acquire more permanent mutex guard on stdout, like with `std::io::Stdout::lock`
388    /// The returned guard also implements AsyncWrite
389    pub fn lock(&self) -> LockResult<SendableStdinGuard> {
390        match self.0.lock() {
391            Ok(x) => Ok(SendableStdinGuard(x)),
392            Err(e) => Err(PoisonError::new(SendableStdinGuard(e.into_inner()))),
393        }
394    }
395    /// Acquire more permanent mutex guard on stdout
396    /// The returned guard also implements AsyncWrite
397    pub fn try_lock(&self) -> TryLockResult<SendableStdinGuard> {
398        match self.0.try_lock() {
399            Ok(x) => Ok(SendableStdinGuard(x)),
400            Err(TryLockError::Poisoned(e)) => Err(TryLockError::Poisoned(PoisonError::new(
401                SendableStdinGuard(e.into_inner()),
402            ))),
403            Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
404        }
405    }
406}
407
408impl Read for SendableStdin {
409    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
410        match self.0.lock() {
411            Ok(mut l) => l.read(buf),
412            Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
413        }
414    }
415}
416impl AsyncRead for SendableStdin {}
417
418impl<'a> Read for SendableStdinGuard<'a> {
419    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
420        self.0.read(buf)
421    }
422}
423impl<'a> AsyncRead for SendableStdinGuard<'a> {}