use crate::client::Client;
use crate::frontend::{Frontend, FrontendBuilder};
use crate::protocol::Endpoint;
use crate::ClientError;
use bytes::BytesMut;
use futures::{Future, Poll, Stream};
use std::io::{self, Read, Write};
use std::process::Command;
use std::process::Stdio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_codec::{Decoder, FramedRead};
use tokio_process::{Child, ChildStderr, ChildStdin, ChildStdout, CommandExt};
struct Core {
#[allow(dead_code)]
core: Child,
stdout: ChildStdout,
stdin: ChildStdin,
}
impl Read for Core {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stdout.read(buf)
}
}
impl AsyncRead for Core {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.stdout.prepare_uninitialized_buffer(buf)
}
}
impl Write for Core {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stdin.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.stdin.flush()
}
}
impl AsyncWrite for Core {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.stdin.shutdown()
}
}
pub fn spawn<B, F>(executable: &str, builder: B) -> Result<(Client, CoreStderr), ClientError>
where
F: Frontend + 'static + Send,
B: FrontendBuilder<Frontend = F> + 'static,
{
spawn_command(Command::new(executable), builder)
}
pub fn spawn_command<B, F>(
mut command: Command,
builder: B,
) -> Result<(Client, CoreStderr), ClientError>
where
F: Frontend + 'static + Send,
B: FrontendBuilder<Frontend = F> + 'static,
{
info!("starting xi-core");
let mut xi_core = command
.stdout(Stdio::piped())
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.env("RUST_BACKTRACE", "1")
.spawn_async()?;
let stdout = xi_core.stdout().take().unwrap();
let stdin = xi_core.stdin().take().unwrap();
let stderr = xi_core.stderr().take().unwrap();
let core = Core {
core: xi_core,
stdout,
stdin,
};
let (endpoint, client) = Endpoint::new(core, builder);
info!("spawning the Xi-RPC endpoint");
tokio::spawn(endpoint.map_err(|e| error!("Endpoint exited with an error: {:?}", e)));
Ok((Client(client), CoreStderr::new(stderr)))
}
pub struct LineCodec;
impl Decoder for LineCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
let line = buf.split_to(n);
buf.split_to(1);
return match ::std::str::from_utf8(line.as_ref()) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
};
}
Ok(None)
}
}
pub struct CoreStderr(FramedRead<ChildStderr, LineCodec>);
impl CoreStderr {
fn new(stderr: ChildStderr) -> Self {
CoreStderr(FramedRead::new(stderr, LineCodec {}))
}
}
impl Stream for CoreStderr {
type Item = String;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll()
}
}