baking 2.0.1

background rust compiler
use {
    crate::*,
    anyhow::Result,
    crossbeam::channel::{bounded, select, unbounded, Receiver, Sender},
    std::{
        io::{BufRead, BufReader},
        process::Stdio,
        thread,
    },
};

/// an executor calling a cargo (or similar) command in a separate
/// thread when asked to and sending the lines of output in a channel,
/// and finishing by None.
/// Channel sizes are designed to avoid useless computations.
pub struct Executor {
    pub line_receiver: Receiver<CommandExecInfo>,
    task_sender: Sender<Task>,
    stop_sender: Sender<()>, // signal for stopping the thread
    thread: thread::JoinHandle<()>,
}

#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct Task {
    pub backtrace: bool,
}

impl Executor {
    /// launch the commands, send the lines of its stderr on the
    /// line channel.
    /// If `with_stdout` capture and send also its stdout.
    pub fn new(mission: &Mission) -> Result<Self> {
        let mut command = mission.get_command();
        let with_stdout = mission.need_stdout();
        let (task_sender, task_receiver) = bounded::<Task>(1);
        let (stop_sender, stop_receiver) = bounded(0);
        let (line_sender, line_receiver) = unbounded();
        command
            .stderr(Stdio::piped())
            .stdout(if with_stdout { Stdio::piped() } else { Stdio::null() });
        let thread = thread::spawn(move || {
            loop {
                select! {
                    recv(task_receiver) -> task => {
                        let task = match task {
                            Ok(task) => task,
                            _ => { break; }
                        };
                        debug!("starting task {:?}", task);
                        command.env(
                            "RUST_BACKTRACE",
                            if task.backtrace { "1" } else { "0" },
                        );
                        let child = command.spawn();
                        let mut child = match child {
                            Ok(child) => child,
                            Err(e) => {
                                if let Err(e) = line_sender.send(CommandExecInfo::Error(
                                    format!("command launch failed: {}", e)
                                )) {
                                    debug!("error when sending launch error: {}", e);
                                    break;
                                }
                                continue;
                            }
                        };
                        let stderr = match child.stderr.take() {
                            Some(stderr) => stderr,
                            None => {
                                if let Err(e) = line_sender.send(CommandExecInfo::Error(
                                    "taking stderr failed".to_string()
                                )) {
                                    debug!("error when sending stderr error: {}", e);
                                    break;
                                }
                                continue;
                            }
                        };
                        // if we need stdout, we listen for in on a separate thread
                        // (if somebody knows of an efficient and clean cross-platform way
                        // to listen for both stderr and stdout on the same thread, please tell me)
                        let out_thread = if with_stdout {
                            let stdout = match child.stdout.take() {
                                Some(stdout) => stdout,
                                None => {
                                    if let Err(e) = line_sender.send(CommandExecInfo::Error(
                                        "taking stdout failed".to_string()
                                    )) {
                                        debug!("error when sending stdout error: {}", e);
                                        break;
                                    }
                                    continue;
                                }
                            };
                            let line_sender = line_sender.clone();
                            Some(thread::spawn(move|| {
                                let mut buf = String::new();
                                let mut buf_reader = BufReader::new(stdout);
                                loop {
                                    let r = match buf_reader.read_line(&mut buf) {
                                        Err(e) => CommandExecInfo::Error(e.to_string()),
                                        Ok(0) => {
                                            // finished
                                            break;
                                        }
                                        Ok(_) => {
                                            // debug!("STDOUT : {:?}", &buf);
                                            CommandExecInfo::Line(CommandOutputLine {
                                                origin: CommandStream::StdOut,
                                                content: TLine::from_tty(buf.trim_end()),
                                            })
                                        }
                                    };
                                    if let Err(e) = line_sender.send(r) {
                                        debug!("error when sending stdout line: {}", e);
                                        break;
                                    }
                                    buf.clear();
                                }
                            }))
                        } else {
                            None
                        };
                        let mut buf = String::new();
                        let mut buf_reader = BufReader::new(stderr);
                        loop {
                            if let Ok(()) = stop_receiver.try_recv() {
                                debug!("stopping during execution");
                                match child.kill() {
                                    Ok(()) => debug!("command stopped"),
                                    _ => debug!("command already stopped"),
                                }
                                return;
                            }
                            let r = match buf_reader.read_line(&mut buf) {
                                Err(e) => CommandExecInfo::Error(e.to_string()),
                                Ok(0) => {
                                    // finished
                                    break;
                                }
                                Ok(_) => {
                                    // debug!("STDERR : {:?}", &buf);
                                    CommandExecInfo::Line(CommandOutputLine {
                                        origin: CommandStream::StdErr,
                                        content: TLine::from_tty(buf.trim_end()),
                                    })
                                }
                            };
                            if let Err(e) = line_sender.send(r) {
                                debug!("error when sending stderr line: {}", e);
                                break;
                            }
                            buf.clear();
                        }
                        let status = match child.wait() {
                            Ok(exit_status) => {
                                debug!("exit_status: {:?}", &exit_status);
                                Some(exit_status)
                            }
                            Err(e) => {
                                warn!("error in child: {:?}", e);
                                None
                            }
                        };
                        if let Err(e) = line_sender.send(CommandExecInfo::End { status }) {
                            debug!("error when sending line: {}", e);
                            break;
                        }
                        debug!("finished command execution");
                        if let Some(thread) = out_thread {
                            debug!("waiting for out listening thread to join");
                            thread.join().unwrap();
                            debug!("out listening thread joined");
                        }
                    }
                    recv(stop_receiver) -> _ => {
                        debug!("leaving thread");
                        return;
                    }
                }
            }
        });
        Ok(Self {
            line_receiver,
            task_sender,
            stop_sender,
            thread,
        })
    }
    /// notify the executor a computation is necessary
    pub fn start(&self, task: Task) -> Result<()> {
        self.task_sender.try_send(task)?;
        Ok(())
    }
    pub fn die(self) -> Result<()> {
        debug!("received kill order");
        self.stop_sender.send(()).unwrap();
        self.thread.join().unwrap();
        Ok(())
    }
}