use std::io::{Read, Stdout, Stderr, Write};
use std::process::Child;
use std::sync::mpsc::Sender;
use super::super::disk_buffer::DiskBufferWriter;
pub enum Pipe {
Stdout(String),
Stderr(String, String)
}
pub enum State {
Processing(JobOutput),
Completed(usize, String),
Error(usize, String),
}
pub struct JobOutput {
pub id: usize,
pub pipe: Pipe,
}
impl Pipe {
pub fn print_message(&self, id: usize, error_file: &mut DiskBufferWriter, stdout: &Stdout,
stderr: &Stderr)
{
let mut stdout = stdout.lock();
let mut stderr = stderr.lock();
match *self {
Pipe::Stdout(ref message) => {
let _ = stdout.write(message.as_bytes());
},
Pipe::Stderr(ref name, ref message) => {
let _ = stderr.write(message.as_bytes());
if let Err(why) = error_file.write(id.to_string().as_bytes())
.and_then(|_| error_file.write(b": "))
.and_then(|_| error_file.write(name.as_bytes()))
.and_then(|_| error_file.write(b": "))
.and_then(|_| error_file.write(message.as_bytes()))
.and_then(|_| error_file.write(b"\n"))
{
let _ = stderr.write(b"parallel: I/O error: ");
let _ = stderr.write(why.to_string().as_bytes());
}
}
}
}
}
pub fn output(child: &mut Child, job_id: usize, name: String, output_tx: &Sender<State>, quiet: bool) {
let stderr = child.stderr.as_mut().expect("unable to open stderr of child");
let mut membuffer = [0u8; 8 * 1024];
if quiet {
while let Ok(bytes_read) = stderr.read(&mut membuffer[..]) {
if bytes_read != 0 {
let output = String::from_utf8_lossy(&membuffer[0..bytes_read]);
let _ = output_tx.send(State::Processing(JobOutput {
id: job_id,
pipe: Pipe::Stderr(name.clone(), output.into_owned())
}));
} else {
break
}
}
} else {
let mut stdout = child.stdout.as_mut().expect("unable to open stdout of child");
loop {
if let Ok(bytes_read) = stdout.read(&mut membuffer[..]) {
if bytes_read != 0 {
let output = String::from_utf8_lossy(&membuffer[0..bytes_read]);
let _ = output_tx.send(State::Processing(JobOutput {
id: job_id,
pipe: Pipe::Stdout(output.into_owned())
}));
} else if let Ok(bytes_read) = stderr.read(&mut membuffer[..]) {
if bytes_read != 0 {
let output = String::from_utf8_lossy(&membuffer[0..bytes_read]);
let _ = output_tx.send(State::Processing(JobOutput {
id: job_id,
pipe: Pipe::Stderr(name.clone(), output.into_owned())
}));
} else {
break
}
}
} else if let Ok(bytes_read) = stderr.read(&mut membuffer[..]) {
if bytes_read != 0 {
let output = String::from_utf8_lossy(&membuffer[0..bytes_read]);
let _ = output_tx.send(State::Processing(JobOutput {
id: job_id,
pipe: Pipe::Stderr(name.clone(), output.into_owned())
}));
} else {
break
}
} else {
break
}
}
}
let _ = output_tx.send(State::Completed(job_id, name));
}