axon 0.3.0

run a command like a plugin with rpc
extern crate nix;
extern crate osaka;
extern crate mio_extras;
extern crate log;

use nix::sys::socket::socketpair;
use nix::sys::socket::AddressFamily;
use nix::sys::socket::SockType;
use nix::sys::socket::SockFlag;
use nix::unistd::dup2;
use std::os::unix::io::RawFd;
use std::io::Error;
use std::io::ErrorKind;
use std::io;
use std::net::Shutdown;
use std::process;
use std::os::unix::process::CommandExt as StdUnixCommandExt;
use std::io::{Read,Write};
use std::env;
use std::process::Command;
use nix::unistd::close;
use nix::unistd::{read, write};
use nix::unistd::fsync;
use std::mem::transmute;
use mio_extras::channel;
use std::thread;
use std::mem;
use std::time::Duration;
use osaka::mio;
use log::{error};


pub struct Child {
    pub io:     Io,
    pub c:      Option<process::Child>,
    pub wait:   channel::Receiver<()>,
}

impl Drop for Child {
    fn drop(&mut self) {
        close(self.io.axon_in).ok();
        close(self.io.axon_out).ok();
        let mut c = mem::replace(&mut self.c, None).unwrap();
        thread::spawn(move || {

            std::mem::replace(&mut c.stdout, None);
            thread::sleep(Duration::from_millis(100));
            c.kill().ok();
            c.wait().ok();
        });
    }
}

pub trait CommandExt {
    fn spawn_with_axon(&mut self) -> io::Result<Child>;
}

impl CommandExt for Command {
    fn spawn_with_axon(&mut self) -> io::Result<Child> {
        let (i1, i2) = socketpair(
            AddressFamily::Unix,
            SockType::SeqPacket,
            None,
            SockFlag::empty(),
        ).unwrap();

        let (o1, o2) = socketpair(
            AddressFamily::Unix,
            SockType::SeqPacket,
            None,
            SockFlag::empty(),
        ).unwrap();


        self.env("AXON_FD_IN", "4");
        self.env("AXON_FD_OUT", "5");

        self.before_exec(move || {
            close(i1).unwrap();
            close(o1).unwrap();


            dup2(i2, 4.into())
                .map_err(|e| Error::new(ErrorKind::Other, e))
                ?;
            dup2(o2, 5.into())
                .map_err(|e| Error::new(ErrorKind::Other, e))
                ?;
            Ok(())
        });

        let (sender, receiver) = channel::channel();

        let child = self.spawn()?;

        close(i2).unwrap();
        close(o2).unwrap();


        let io = Io {
            axon_in : o1,
            axon_out: i1,
            stream:   false,
        };
        let pid = nix::unistd::Pid::from_raw(child.id() as i32);
        thread::spawn(move || {
            if let Err(e) = nix::sys::wait::waitpid(Some(pid), None) {
                error!("in pty waitpid: {}", e);
            }
            if let Err(e) = sender.send(()) {
                error!("in pty after waitpid, trying to send exit signal: {}", e);
            }

        });

        Ok(Child{
            wait: receiver,
            io,
            c: Some(child),
        })
    }

}


#[derive(Clone)]
pub struct Io {
    axon_in:  RawFd,
    axon_out: RawFd,
    stream:   bool,
}


impl osaka::mio::event::Evented for Io {
    fn register(&self, poll: &mio::Poll, token: mio::Token, interest: mio::Ready, opts: mio::PollOpt) -> io::Result<()> {
        mio::unix::EventedFd(&self.axon_in).register(poll, token, interest, opts)
    }

    fn reregister(&self, poll: &mio::Poll, token: mio::Token, interest: mio::Ready, opts: mio::PollOpt) -> io::Result<()> {
        mio::unix::EventedFd(&self.axon_in).reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
        mio::unix::EventedFd(&self.axon_in).deregister(poll)
    }
}

impl Io {
    pub fn make_async(&mut self) -> io::Result<()> {
        use nix::fcntl::{fcntl, FdFlag, OFlag};
        use nix::fcntl::FcntlArg::{F_SETFD, F_SETFL};
        fcntl(self.axon_in, F_SETFD(FdFlag::FD_CLOEXEC))
            .map_err(|e| Error::new(ErrorKind::Other, e))
            ?;
        fcntl(self.axon_in, F_SETFL(OFlag::O_NONBLOCK))
            .map_err(|e| Error::new(ErrorKind::Other, e))
            ?;
        Ok(())
    }

    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
        match how {
            Shutdown::Read => {
                close(self.axon_in).ok();
            },
            Shutdown::Write => {
                close(self.axon_out).ok();
            },
            Shutdown::Both => {
                close(self.axon_in).ok();
                close(self.axon_out).ok();
            },
        }
        Ok(())
    }
}

impl Read for Io {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        match read(self.axon_in, buf) {
            Ok(v) => Ok(v),
            Err(nix::Error::Sys(errno)) => Err(io::Error::from_raw_os_error(errno as i32)),
            Err(e) => Err(Error::new(ErrorKind::Other, e)),
        }
    }
}

impl Read for &Io {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        match read(self.axon_in, buf) {
            Ok(v) => Ok(v),
            Err(nix::Error::Sys(errno)) => Err(io::Error::from_raw_os_error(errno as i32)),
            Err(e) => Err(Error::new(ErrorKind::Other, e)),
        }
    }
}

impl Write for Io {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        if self.stream {
            let len = buf.len() as u64;
            let len : [u8; 8] = unsafe {transmute(len)};
            write(self.axon_out, &len).ok();
            write(self.axon_out, b"\n").ok();
        }
        let r = match write(self.axon_out, buf) {
            Ok(v) => Ok(v),
            Err(nix::Error::Sys(errno)) => Err(io::Error::from_raw_os_error(errno as i32)),
            Err(e) => Err(Error::new(ErrorKind::Other, e)),
        };

        if self.stream {
            write(self.axon_out, b"\n").ok();
        }

        r
    }

    fn flush(&mut self) -> io::Result<()> {
        fsync(self.axon_out).ok();
        Ok(())
    }
}


fn from_axion() -> Result<Io, Error> {
    let axon_out : RawFd = env::var("AXON_FD_OUT")
        .map_err(|_| Error::new(ErrorKind::Other, format!("AXON_FD_OUT missing. executable needs to be spawned from an axon host")))?
        .parse()
        .map_err(|e| Error::new(ErrorKind::Other, e))?;

    let axon_in : RawFd = env::var("AXON_FD_IN")
        .map_err(|_| Error::new(ErrorKind::Other, format!("AXON_FD_IN  missing. executable needs to be spawned from an axon host")))?
        .parse()
        .map_err(|e| Error::new(ErrorKind::Other, e))?;

    Ok(Io {
        axon_in,
        axon_out,
        stream: false,
    })
}

pub fn from_std() -> Io {
    Io {
        axon_in:  0.into(),
        axon_out: 1.into(),
        stream: true,
    }
}

pub fn child() -> Io {
    match from_axion() {
        Ok(v)  => return v,
        Err(e) => eprintln!("{}", e),
    }
    from_std()
}