1#![deny(missing_docs)]
2
3extern crate futures;
34extern crate tokio_io;
35
36const BUFSIZ: usize = 8192;
37
38use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
39use std::cell::RefCell;
40use std::io::{Error, ErrorKind, Read, Result, Write};
41use std::rc::Rc;
42use std::sync::{Arc, LockResult, Mutex, MutexGuard, PoisonError, TryLockError, TryLockResult};
43use std::thread::JoinHandle;
44use tokio_io::{AsyncRead, AsyncWrite};
45
46type BBR = futures::sync::mpsc::Receiver<Box<[u8]>>;
47type BBS = futures::sync::mpsc::Sender<Box<[u8]>>;
48
49pub struct ThreadedStdin {
51 debt: Option<Box<[u8]>>,
52 rcv: BBR,
53}
54
55impl ThreadedStdin {
56 pub fn make_sendable(self) -> SendableStdin {
58 SendableStdin::new(self)
59 }
60 pub fn make_clonable(self) -> ClonableStdin {
62 ClonableStdin::new(self)
63 }
64}
65
66pub fn stdin(queue_size: usize) -> ThreadedStdin {
68 let (snd_, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
69 std::thread::spawn(move || {
70 let mut snd = snd_;
71 let sin = ::std::io::stdin();
72 let mut sin_lock = sin.lock();
73 let mut buf = vec![0; BUFSIZ];
74 while let Ok(ret) = sin_lock.read(&mut buf[..]) {
75 let content = buf[0..ret].to_vec().into_boxed_slice();
76 snd = match snd.send(content).wait() {
77 Ok(x) => x,
78 Err(_) => break,
79 }
80 }
81 });
82 ThreadedStdin { debt: None, rcv }
83}
84
85impl AsyncRead for ThreadedStdin {}
86impl Read for ThreadedStdin {
87 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
88 let mut handle_the_buffer = |incoming_buf: Box<[u8]>| {
89 let l = buf.len();
90 let dl = incoming_buf.len();
91 if l >= dl {
92 buf[0..dl].copy_from_slice(&incoming_buf);
93 (None, Ok(dl))
94 } else {
95 buf[0..l].copy_from_slice(&incoming_buf[0..l]);
96 let newdebt = Some(incoming_buf[l..].to_vec().into_boxed_slice());
97 (newdebt, Ok(l))
98 }
99 };
100
101 let (new_debt, ret) = if let Some(debt) = self.debt.take() {
102 handle_the_buffer(debt)
103 } else {
104 match self.rcv.poll() {
105 Ok(Async::Ready(Some(newbuf))) => handle_the_buffer(newbuf),
106 Ok(Async::Ready(None)) => (None, Err(ErrorKind::BrokenPipe.into())),
107 Ok(Async::NotReady) => (None, Err(ErrorKind::WouldBlock.into())),
108 Err(_) => (None, Err(ErrorKind::Other.into())),
109 }
110 };
111 self.debt = new_debt;
112 ret
113 }
114}
115
116pub struct ThreadedStdout {
118 snd: BBS,
119 jh: Option<JoinHandle<()>>,
120}
121
122impl ThreadedStdout {
123 pub fn make_sendable(self) -> SendableStdout {
125 SendableStdout::new(self)
126 }
127 pub fn make_clonable(self) -> ClonableStdout {
129 ClonableStdout::new(self)
130 }
131}
132pub fn stdout(queue_size: usize) -> ThreadedStdout {
134 let (snd, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
135 let jh = std::thread::spawn(move || {
136 let sout = ::std::io::stdout();
137 let mut sout_lock = sout.lock();
138 for b in rcv.wait() {
139 if let Ok(b) = b {
140 if b.len() == 0 {
141 break;
142 }
143 if sout_lock.write_all(&b).is_err() {
144 break;
145 }
146 if sout_lock.flush().is_err() {
147 break;
148 }
149 } else {
150 break;
151 }
152 }
153 let _ = sout_lock.write(&[]);
154 });
155 ThreadedStdout { snd, jh: Some(jh) }
156}
157
158impl AsyncWrite for ThreadedStdout {
159 fn shutdown(&mut self) -> Poll<(), Error> {
160 match self.snd.start_send(vec![].into_boxed_slice()) {
162 Ok(AsyncSink::Ready) => (),
163 Ok(AsyncSink::NotReady(_)) => return Ok(Async::NotReady),
164 Err(_) => {
165 return Ok(Async::Ready(()));
166 }
167 };
168 match self.snd.poll_complete() {
169 Ok(Async::Ready(_)) => (),
170 Ok(Async::NotReady) => return Ok(Async::NotReady),
171 Err(e) => {
172 return Ok(Async::Ready(()));
173 }
174 };
175 if self.snd.close().is_err() {
176 return Ok(Async::Ready(()));
177 };
178 if let Some(jh) = self.jh.take() {
179 if jh.join().is_err() {
180 return Err(ErrorKind::Other.into());
181 };
182 }
183 Ok(Async::Ready(()))
184 }
185}
186impl Write for ThreadedStdout {
187 fn write(&mut self, buf: &[u8]) -> Result<usize> {
188 if buf.is_empty() {
189 return Ok(0);
190 }
191
192 match self.snd.start_send(buf.to_vec().into_boxed_slice()) {
193 Ok(AsyncSink::Ready) => (),
194 Ok(AsyncSink::NotReady(_)) => return Err(ErrorKind::WouldBlock.into()),
195 Err(_) => return Err(ErrorKind::Other.into()),
196 }
197
198 Ok(buf.len())
199 }
200 fn flush(&mut self) -> Result<()> {
201 match self.snd.poll_complete() {
202 Ok(Async::Ready(_)) => Ok(()),
203 Ok(Async::NotReady) => Err(ErrorKind::WouldBlock.into()),
204 Err(_) => Err(ErrorKind::Other.into()),
205 }
206 }
207}
208
209pub type ThreadedStderr = ThreadedStdout;
213pub fn stderr(queue_size: usize) -> ThreadedStderr {
215 let (snd, rcv): (BBS, BBR) = futures::sync::mpsc::channel(queue_size);
216 let jh = std::thread::spawn(move || {
217 let sout = ::std::io::stderr();
218 let mut sout_lock = sout.lock();
219 for b in rcv.wait() {
220 if let Ok(b) = b {
221 if b.len() == 0 {
222 break;
223 }
224 if sout_lock.write_all(&b).is_err() {
225 break;
226 }
227 if sout_lock.flush().is_err() {
228 break;
229 }
230 } else {
231 break;
232 }
233 }
234 let _ = sout_lock.write(&[]);
235 });
236 ThreadedStdout { snd, jh: Some(jh) }
237}
238
239#[derive(Clone)]
246pub struct SendableStdout(Arc<Mutex<ThreadedStdout>>);
247
248pub struct SendableStdoutGuard<'a>(MutexGuard<'a, ThreadedStdout>);
250
251impl SendableStdout {
252 pub fn new(so: ThreadedStdout) -> SendableStdout {
254 SendableStdout(Arc::new(Mutex::new(so)))
255 }
256
257 pub fn lock(&self) -> LockResult<SendableStdoutGuard> {
260 match self.0.lock() {
261 Ok(x) => Ok(SendableStdoutGuard(x)),
262 Err(e) => Err(PoisonError::new(SendableStdoutGuard(e.into_inner()))),
263 }
264 }
265 pub fn try_lock(&self) -> TryLockResult<SendableStdoutGuard> {
268 match self.0.try_lock() {
269 Ok(x) => Ok(SendableStdoutGuard(x)),
270 Err(TryLockError::Poisoned(e)) => Err(TryLockError::Poisoned(PoisonError::new(
271 SendableStdoutGuard(e.into_inner()),
272 ))),
273 Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
274 }
275 }
276}
277
278impl Write for SendableStdout {
279 fn write(&mut self, buf: &[u8]) -> Result<usize> {
280 match self.0.lock() {
281 Ok(mut l) => l.write(buf),
282 Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
283 }
284 }
285 fn flush(&mut self) -> Result<()> {
286 match self.0.lock() {
287 Ok(mut l) => l.flush(),
288 Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
289 }
290 }
291}
292impl AsyncWrite for SendableStdout {
293 fn shutdown(&mut self) -> Poll<(), Error> {
294 match self.0.lock() {
295 Ok(mut l) => l.shutdown(),
296 Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
297 }
298 }
299}
300impl<'a> Write for SendableStdoutGuard<'a> {
301 fn write(&mut self, buf: &[u8]) -> Result<usize> {
302 self.0.write(buf)
303 }
304 fn flush(&mut self) -> Result<()> {
305 self.0.flush()
306 }
307}
308impl<'a> AsyncWrite for SendableStdoutGuard<'a> {
309 fn shutdown(&mut self) -> Poll<(), Error> {
310 self.0.shutdown()
311 }
312}
313
314#[derive(Clone)]
317pub struct ClonableStdout(Rc<RefCell<ThreadedStdout>>);
318impl ClonableStdout {
319 pub fn new(so: ThreadedStdout) -> ClonableStdout {
321 ClonableStdout(Rc::new(RefCell::new(so)))
322 }
323}
324
325impl Write for ClonableStdout {
326 fn write(&mut self, buf: &[u8]) -> Result<usize> {
327 self.0.borrow_mut().write(buf)
328 }
329 fn flush(&mut self) -> Result<()> {
330 self.0.borrow_mut().flush()
331 }
332}
333impl AsyncWrite for ClonableStdout {
334 fn shutdown(&mut self) -> Poll<(), Error> {
335 self.0.borrow_mut().shutdown()
336 }
337}
338
339pub type SendableStderr = SendableStdout;
346pub type SendableStderrGuard<'a> = SendableStdoutGuard<'a>;
348pub type ClonableStderr = ClonableStdout;
350
351#[derive(Clone)]
357pub struct ClonableStdin(Rc<RefCell<ThreadedStdin>>);
358impl ClonableStdin {
359 pub fn new(so: ThreadedStdin) -> ClonableStdin {
361 ClonableStdin(Rc::new(RefCell::new(so)))
362 }
363}
364
365impl Read for ClonableStdin {
366 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
367 self.0.borrow_mut().read(buf)
368 }
369}
370impl AsyncRead for ClonableStdin {}
371#[derive(Clone)]
376pub struct SendableStdin(Arc<Mutex<ThreadedStdin>>);
377
378pub struct SendableStdinGuard<'a>(MutexGuard<'a, ThreadedStdin>);
380
381impl SendableStdin {
382 pub fn new(si: ThreadedStdin) -> SendableStdin {
384 SendableStdin(Arc::new(Mutex::new(si)))
385 }
386
387 pub fn lock(&self) -> LockResult<SendableStdinGuard> {
390 match self.0.lock() {
391 Ok(x) => Ok(SendableStdinGuard(x)),
392 Err(e) => Err(PoisonError::new(SendableStdinGuard(e.into_inner()))),
393 }
394 }
395 pub fn try_lock(&self) -> TryLockResult<SendableStdinGuard> {
398 match self.0.try_lock() {
399 Ok(x) => Ok(SendableStdinGuard(x)),
400 Err(TryLockError::Poisoned(e)) => Err(TryLockError::Poisoned(PoisonError::new(
401 SendableStdinGuard(e.into_inner()),
402 ))),
403 Err(TryLockError::WouldBlock) => Err(TryLockError::WouldBlock),
404 }
405 }
406}
407
408impl Read for SendableStdin {
409 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
410 match self.0.lock() {
411 Ok(mut l) => l.read(buf),
412 Err(e) => Err(Error::new(ErrorKind::Other, format!("{}", e))),
413 }
414 }
415}
416impl AsyncRead for SendableStdin {}
417
418impl<'a> Read for SendableStdinGuard<'a> {
419 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
420 self.0.read(buf)
421 }
422}
423impl<'a> AsyncRead for SendableStdinGuard<'a> {}