cb2_core 0.1.2

The core for the cb2 command runner
Documentation
use crate::exec::FutureSig;
use crate::report::Report;
use crate::task::TaskItem;
use chrono::Duration;
use chrono::Utc;
use futures::future::lazy;
use futures::sync::mpsc::Sender;
use futures::sync::oneshot;
use futures::Future;
use futures::Sink;
use std::process::Command;
use std::process::Stdio;

pub fn task_item(task_item: TaskItem, sender: Sender<Report>) -> FutureSig {
    let cmd_clone = task_item.cmd.clone();
    let id_clone = task_item.id.clone();
    let id_clone2 = task_item.id.clone();
    Box::new(lazy(move || {
        let (tx, rx) = oneshot::channel();
        tokio::spawn(lazy(move || {
            let (tx1, rx1) = oneshot::channel();
            tokio::spawn(
                sender
                    .clone()
                    .send(Report::Started {
                        id: id_clone.clone(),
                        time: Utc::now(),
                    })
                    .then(|v| {
                        match v {
                            Ok(_x) => {}
                            Err(e) => {
                                eprintln!("{}", e);
                            }
                        }
                        tx1.send(())
                    })
                    .map(|_val| ())
                    .map_err(|_e: ()| ()),
            );
            rx1.then(move |_report| {
                let begin_time = Utc::now();
                let mut child = Command::new("sh");
                child.arg("-c").arg(cmd_clone);
                child.stdin(Stdio::inherit());
                child.stdout(Stdio::inherit());
                match child.status() {
                    Ok(s) => {
                        let outgoing = if s.success() {
                            Report::End {
                                id: id_clone.clone(),
                                time: Utc::now(),
                                dur: Utc::now().signed_duration_since(begin_time),
                            }
                        } else {
                            Report::Error {
                                id: id_clone.clone(),
                                time: Utc::now(),
                                dur: Utc::now().signed_duration_since(begin_time),
                            }
                        };
                        tokio::spawn(
                            sender
                                .clone()
                                .send(outgoing.clone())
                                .map(|_val| ())
                                .map_err(|_e| ()),
                        );
                        match tx.send(report_wrap(outgoing)) {
                            Ok(_s) => {
                                debug!("sent oneshot for {}", id_clone);
                            }
                            Err(_e) => {
                                error!("failed to send oneshot for {}", id_clone);
                            }
                        }
                        Ok(())
                    }
                    Err(_e) => Err(()),
                }
            })
        }));
        rx.map_err(move |_e| Report::Error {
            id: id_clone2,
            time: Utc::now(),
            dur: Duration::seconds(0),
        })
    }))
}

fn report_wrap(report: Report) -> Result<Report, Report> {
    match report {
        Report::Error { .. } | Report::ErrorGroup { .. } => Err(report),
        _ => Ok(report),
    }
}