use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::process::Child;
use std::thread;
use std::time::Duration;
use serde_json::Value;
use tokio::sync::mpsc;
const CHANNEL_BUFFER: usize = 100;
pub struct ApiProcess {
child: Child,
stdin: mpsc::Sender<String>,
stdout: mpsc::Receiver<String>,
stderr: mpsc::Receiver<String>,
timeout: Option<Duration>,
}
impl ApiProcess {
pub fn new(mut child: Child, timeout: impl Into<Option<Duration>>) -> Self {
let mut stdin = BufWriter::new(child.stdin.take().expect("Child missing stdin"));
let mut stdout = BufReader::new(child.stdout.take().expect("Child missing stdout"));
let mut stderr = BufReader::new(child.stderr.take().expect("Child missing stderr"));
let (stdin_tx, mut rx) = mpsc::channel::<String>(CHANNEL_BUFFER);
thread::spawn(move || {
while let Some(data) = rx.blocking_recv() {
if stdin.write_all(data.as_bytes()).is_err() {
break;
}
if stdin.flush().is_err() {
break;
}
}
});
let (tx, stdout_rx) = mpsc::channel::<String>(CHANNEL_BUFFER);
thread::spawn(move || {
let mut line = String::new();
while let Ok(n) = stdout.read_line(&mut line) {
if n == 0 {
break;
}
if tx.blocking_send(line).is_err() {
break;
}
line = String::new();
}
});
let (tx, stderr_rx) = mpsc::channel::<String>(CHANNEL_BUFFER);
thread::spawn(move || {
let mut line = String::new();
while let Ok(n) = stderr.read_line(&mut line) {
if n == 0 {
break;
}
if tx.blocking_send(line).is_err() {
break;
}
line = String::new();
}
});
Self {
child,
stdin: stdin_tx,
stdout: stdout_rx,
stderr: stderr_rx,
timeout: timeout.into(),
}
}
pub async fn write_and_read_json(
&mut self,
value: impl Into<Value>,
) -> io::Result<Option<Value>> {
self.write_json_to_stdin(value).await?;
self.read_json_from_stdout().await
}
pub async fn write_line_to_stdin(&mut self, line: impl Into<String>) -> io::Result<()> {
let mut line = line.into();
if !line.ends_with('\n') {
line.push('\n');
}
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, self.stdin.send(line)).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(x)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, x)),
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
self.collect_stderr(),
)),
},
None => self
.stdin
.send(line)
.await
.map_err(|x| io::Error::new(io::ErrorKind::BrokenPipe, x)),
}
}
pub async fn write_json_to_stdin(&mut self, value: impl Into<Value>) -> io::Result<()> {
self.write_line_to_stdin(value.into().to_string()).await
}
pub fn try_read_line_from_stdout(&mut self) -> io::Result<Option<String>> {
match self.stdout.try_recv() {
Ok(line) => Ok(Some(line)),
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Disconnected) => {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
}
}
}
pub async fn read_line_from_stdout(&mut self) -> io::Result<Option<String>> {
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, self.stdout.recv()).await {
Ok(x) => Ok(x),
Err(_) => Err(io::Error::new(
io::ErrorKind::TimedOut,
self.collect_stderr(),
)),
},
None => Ok(self.stdout.recv().await),
}
}
pub async fn read_json_from_stdout(&mut self) -> io::Result<Option<Value>> {
match self.read_line_from_stdout().await? {
Some(line) => {
let value = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?;
Ok(Some(value))
}
None => Ok(None),
}
}
#[allow(dead_code)]
pub async fn read_line_from_stderr(&mut self) -> io::Result<Option<String>> {
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, self.stderr.recv()).await {
Ok(x) => Ok(x),
Err(x) => Err(io::Error::new(io::ErrorKind::TimedOut, x)),
},
None => Ok(self.stderr.recv().await),
}
}
pub fn try_read_line_from_stderr(&mut self) -> io::Result<Option<String>> {
match self.stderr.try_recv() {
Ok(line) => Ok(Some(line)),
Err(mpsc::error::TryRecvError::Empty) => Ok(None),
Err(mpsc::error::TryRecvError::Disconnected) => {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
}
}
}
pub fn collect_stderr(&mut self) -> String {
let mut stderr = String::new();
while let Ok(Some(line)) = self.try_read_line_from_stderr() {
stderr.push_str(&line);
}
stderr
}
pub fn kill(&mut self) -> io::Result<()> {
self.child.kill()
}
}
impl Drop for ApiProcess {
fn drop(&mut self) {
let _ = self.kill();
}
}