pipeawesome2 0.1.3

Loops, branches and joins to UNIX pipes… in a sane way…
Documentation
use crate::motion::PullJourney;
use crate::motion::Journey;
use crate::connectable::Breakable;
use std::time::Instant;
use crate::connectable::Connectable;
use crate::connectable::ConnectableAddInputError;
use crate::connectable::OutputPort;
use crate::connectable::ConnectableAddOutputError;
use super::motion::{ MotionError, MotionResult, MotionNotifications, Pull, Push, motion };

use crate::{startable_control::StartableControl};
use async_trait::async_trait;

#[allow(clippy::new_without_default)]
pub struct Drain {
    started: bool,
    stdin: Option<Pull>,
    write_location: Option<String>,
}

impl Drain {

    pub fn new(write_location: String) -> Drain {
        Drain {
            started: false,
            stdin: None,
            write_location: Some(write_location)
        }
    }

}


impl Connectable for Drain {

    fn add_output(&mut self, port: OutputPort, _breakable: Breakable, _src_id: usize, _dst_id: usize) -> std::result::Result<Pull, ConnectableAddOutputError> {
        Err(ConnectableAddOutputError::UnsupportedPort(port))
    }

    fn add_input(&mut self, pull: Pull, unused_priority: isize) -> std::result::Result<(), ConnectableAddInputError> {
        if unused_priority != 0 {
            return Err(ConnectableAddInputError::UnsupportedPriority(unused_priority));
        }
        if self.stdin.is_some() {
            return Err(ConnectableAddInputError::AlreadyAllocated);
        }
        self.stdin = Some(pull);
        Ok(())
    }

}



#[async_trait]
impl StartableControl for Drain {
    async fn start(&mut self) -> MotionResult<usize> {


        self.started = true;
        if let Some(pull) = std::mem::take(&mut self.stdin) {

            let push = match (pull.journey(), std::mem::take(&mut self.write_location)) {
                (Some(PullJourney { src, dst }), Some(f)) if f == "-" => Push::Stdout(Journey { src: *src, dst: *dst, breakable: Breakable::Terminate }, async_std::io::stdout()),
                (Some(PullJourney { src, dst }), Some(f)) if f == "_" => Push::Stderr(Journey { src: *src, dst: *dst, breakable: Breakable::Terminate }, async_std::io::stderr()),
                (Some(PullJourney { src, dst }), Some(filename)) => {
                    let breakable = Breakable::Terminate;
                    let file = async_std::fs::File::create(filename).await
                        .map_err(|e| MotionError::OpenIOError(PullJourney { src: *src, dst: *dst }, Instant::now(), e))?;
                    Push::File(Journey { src: *src, dst: *dst, breakable }, async_std::io::BufWriter::new(file))
                },
                _ => Push::None,
            };

            return motion(pull, MotionNotifications::empty(), push).await
        }
        MotionResult::Err(MotionError::NoneError)
    }
}