buildkit-frontend 0.1.0

Foundation for BuildKit frontends implemented in Rust
Documentation
use std::io::{self, stdin, stdout};
use std::io::{Error, Read, Write};

use http_connection::HttpConnection;
use hyper::client::connect::{Connect, Connected, Destination};
use tokio::prelude::*;
use tokio::reactor::PollEvented2;

pub struct StdioSocket {
    reader: PollEvented2<async_stdio::EventedStdin>,
    writer: PollEvented2<async_stdio::EventedStdout>,
}

impl StdioSocket {
    pub fn try_new() -> io::Result<Self> {
        Ok(StdioSocket {
            reader: PollEvented2::new(async_stdio::EventedStdin::try_new(stdin())?),
            writer: PollEvented2::new(async_stdio::EventedStdout::try_new(stdout())?),
        })
    }
}

pub struct StdioConnector;

impl Read for StdioSocket {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
        self.reader.read(buf)
    }
}

impl Write for StdioSocket {
    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
        self.writer.write(buf)
    }

    fn flush(&mut self) -> Result<(), Error> {
        self.writer.flush()
    }
}

impl AsyncRead for StdioSocket {}

impl AsyncWrite for StdioSocket {
    fn shutdown(&mut self) -> Result<Async<()>, Error> {
        self.writer.shutdown()
    }
}

impl HttpConnection for StdioSocket {}

impl Connect for StdioConnector {
    type Transport = StdioSocket;
    type Error = io::Error;
    type Future = future::FutureResult<(Self::Transport, Connected), io::Error>;

    fn connect(&self, _: Destination) -> Self::Future {
        future::result(StdioSocket::try_new().map(|socket| (socket, Connected::new())))
    }
}

mod async_stdio {
    use std::io::{self, Read, Stdin, Stdout, Write};
    use std::os::unix::io::AsRawFd;

    use mio::event::Evented;
    use mio::unix::EventedFd;
    use mio::{Poll, PollOpt, Ready, Token};

    use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};

    pub struct EventedStdin(Stdin);
    pub struct EventedStdout(Stdout);

    impl EventedStdin {
        pub fn try_new(stdin: Stdin) -> io::Result<Self> {
            set_non_blocking_flag(&stdin)?;
            Ok(EventedStdin(stdin))
        }
    }

    impl EventedStdout {
        pub fn try_new(stdout: Stdout) -> io::Result<Self> {
            set_non_blocking_flag(&stdout)?;
            Ok(EventedStdout(stdout))
        }
    }

    impl Evented for EventedStdin {
        fn register(
            &self,
            poll: &Poll,
            token: Token,
            interest: Ready,
            opts: PollOpt,
        ) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
        }

        fn reregister(
            &self,
            poll: &Poll,
            token: Token,
            interest: Ready,
            opts: PollOpt,
        ) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
        }

        fn deregister(&self, poll: &Poll) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).deregister(poll)
        }
    }

    impl Read for EventedStdin {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            self.0.read(buf)
        }
    }

    impl Evented for EventedStdout {
        fn register(
            &self,
            poll: &Poll,
            token: Token,
            interest: Ready,
            opts: PollOpt,
        ) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
        }

        fn reregister(
            &self,
            poll: &Poll,
            token: Token,
            interest: Ready,
            opts: PollOpt,
        ) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
        }

        fn deregister(&self, poll: &Poll) -> io::Result<()> {
            EventedFd(&self.0.as_raw_fd()).deregister(poll)
        }
    }

    impl Write for EventedStdout {
        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
            self.0.write(buf)
        }

        fn flush(&mut self) -> io::Result<()> {
            self.0.flush()
        }
    }

    fn set_non_blocking_flag<T: AsRawFd>(stream: &T) -> io::Result<()> {
        let flags = unsafe { fcntl(stream.as_raw_fd(), F_GETFL, 0) };

        if flags < 0 {
            return Err(std::io::Error::last_os_error());
        }

        if unsafe { fcntl(stream.as_raw_fd(), F_SETFL, flags | O_NONBLOCK) } != 0 {
            return Err(std::io::Error::last_os_error());
        }

        Ok(())
    }
}