1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use crate::client::Client;
use crate::frontend::{Frontend, FrontendBuilder};
use crate::protocol::Endpoint;
use crate::ClientError;
use bytes::BytesMut;
use futures::{Future, Poll, Stream};
use std::io::{self, Read, Write};
use std::process::Command;
use std::process::Stdio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_codec::{Decoder, FramedRead};
use tokio_process::{Child, ChildStderr, ChildStdin, ChildStdout, CommandExt};

struct Core {
    #[allow(dead_code)]
    core: Child,
    stdout: ChildStdout,
    stdin: ChildStdin,
}

impl Read for Core {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.stdout.read(buf)
    }
}

impl AsyncRead for Core {
    // FIXME: do I actually have to implement this?
    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
        self.stdout.prepare_uninitialized_buffer(buf)
    }
}

impl Write for Core {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.stdin.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.stdin.flush()
    }
}

impl AsyncWrite for Core {
    fn shutdown(&mut self) -> Poll<(), io::Error> {
        self.stdin.shutdown()
    }
}

/// Start Xi core, and spawn an RPC client on the current tokio executor.
///
/// # Panics
///
/// This function calls
/// [`tokio::spawn`](https://docs.rs/tokio/0.1.21/tokio/executor/fn.spawn.html)
/// so it will panic if the default executor is not set or if spawning
/// onto the default executor returns an error.
pub fn spawn<B, F>(executable: &str, builder: B) -> Result<(Client, CoreStderr), ClientError>
where
    F: Frontend + 'static + Send,
    B: FrontendBuilder<Frontend = F> + 'static,
{
    spawn_command(Command::new(executable), builder)
}

/// Same as [`spawn`] but accepts an arbitrary [`std::process::Command`].
pub fn spawn_command<B, F>(
    mut command: Command,
    builder: B,
) -> Result<(Client, CoreStderr), ClientError>
where
    F: Frontend + 'static + Send,
    B: FrontendBuilder<Frontend = F> + 'static,
{
    info!("starting xi-core");
    let mut xi_core = command
        .stdout(Stdio::piped())
        .stdin(Stdio::piped())
        .stderr(Stdio::piped())
        .env("RUST_BACKTRACE", "1")
        .spawn_async()?;

    let stdout = xi_core.stdout().take().unwrap();
    let stdin = xi_core.stdin().take().unwrap();
    let stderr = xi_core.stderr().take().unwrap();
    let core = Core {
        core: xi_core,
        stdout,
        stdin,
    };

    let (endpoint, client) = Endpoint::new(core, builder);

    info!("spawning the Xi-RPC endpoint");
    // XXX: THIS PANICS IF THE DEFAULT EXECUTOR IS NOT SET
    tokio::spawn(endpoint.map_err(|e| error!("Endpoint exited with an error: {:?}", e)));

    Ok((Client(client), CoreStderr::new(stderr)))
}

pub struct LineCodec;

// straight from
// https://github.com/tokio-rs/tokio-line/blob/master/simple/src/lib.rs
impl Decoder for LineCodec {
    type Item = String;
    type Error = io::Error;

    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
        if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
            let line = buf.split_to(n);
            buf.split_to(1);
            return match ::std::str::from_utf8(line.as_ref()) {
                Ok(s) => Ok(Some(s.to_string())),
                Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
            };
        }
        Ok(None)
    }
}

/// A stream of Xi core stderr lines
pub struct CoreStderr(FramedRead<ChildStderr, LineCodec>);

impl CoreStderr {
    fn new(stderr: ChildStderr) -> Self {
        CoreStderr(FramedRead::new(stderr, LineCodec {}))
    }
}

impl Stream for CoreStderr {
    type Item = String;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.0.poll()
    }
}