use log::{debug, error, info, log, trace, warn, Level};
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{BufRead, BufReader};
use std::panic;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout};
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread::{self, JoinHandle};
use crate::bson::{BsonReader, BsonWriter};
use crate::msg::{OkResponse, Request, Response};
use crate::tool::{PluginError, PluginResult};
use crate::PluginInfo;
fn stdin_thread(stdin: ChildStdin, rx_in: Receiver<Request>) -> PluginResult<()> {
let mut writer = BsonWriter::new(stdin);
for request in rx_in {
debug!("stdin: sending {:?}", request);
writer.write(request)?;
}
trace!("stdin: leaving thread");
Ok(())
}
fn stdout_thread(stdout: ChildStdout, tx_out: Sender<Response>) -> PluginResult<()> {
let mut reader = BsonReader::new(stdout);
loop {
match reader.read() {
Ok(Some(response)) => {
trace!("stdout: received {:?}", response);
tx_out.send(response)?;
}
Ok(None) => {
trace!("stdout: end of stream");
break;
}
Err(err) => {
error!("stdout: {:?}", err);
return Err(err.into());
}
}
}
trace!("stdout: leaving thread");
Ok(())
}
fn stderr_thread(stderr: ChildStderr) -> PluginResult<()> {
let mut reader = BufReader::new(stderr);
let mut line_buf = Vec::<u8>::new();
loop {
line_buf.clear();
let n = reader.read_until(0x0A, &mut line_buf)?;
if n > 0 {
let line = String::from_utf8_lossy(&line_buf);
if let Some(msg) = line.strip_prefix("nuts-log-error:") {
error!("[plugin] {}", msg.trim());
} else if let Some(msg) = line.strip_prefix("nuts-log-warn:") {
warn!("[plugin] {}", msg.trim());
} else if let Some(msg) = line.strip_prefix("nuts-log-info:") {
info!("[plugin] {}", msg.trim());
} else if let Some(msg) = line.strip_prefix("nuts-log-debug:") {
debug!("[plugin] {}", msg.trim());
} else if let Some(msg) = line.strip_prefix("nuts-log-trace:") {
trace!("[plugin] {}", msg.trim());
} else {
error!("stderr: {}", line.trim_end());
}
} else {
trace!("stderr: end of stream");
break;
}
}
trace!("stderr: leaving thread");
Ok(())
}
macro_rules! handshake_func {
($name:ident ( $( $argn:ident : $argt:ty ),* ) -> $ty:ty, $req:expr, $variant:pat => $ret:expr) => {
pub fn $name(&mut self, $($argn: $argt),*) -> PluginResult<$ty> {
let response = self.handshake($req).map_err(|err| {
error!("failed message handshake: {}", err);
err
})?;
let result = match response {
Response::Ok($variant) => $ret,
Response::Ok(_) => Err(PluginError::InvalidResponse),
Response::Err(err) => Err(PluginError::Response(err)),
};
if result.is_err() {
self.shutdown();
}
result
}
};
}
macro_rules! join_thread {
($id:literal, $opt:expr) => {
if let Some(handle) = $opt.take() {
debug!("join thread: {}", $id);
if let Err(err) = handle.join() {
panic::resume_unwind(err);
}
} else {
debug!("nothing to join: {}", $id);
}
};
}
#[derive(Debug)]
pub struct PluginConnection {
child: Child,
tx_in: Option<Sender<Request>>,
rx_out: Option<Receiver<Response>>,
t_stdin: Option<JoinHandle<Result<(), PluginError>>>,
t_stdout: Option<JoinHandle<Result<(), PluginError>>>,
t_stderr: Option<JoinHandle<Result<(), PluginError>>>,
}
impl PluginConnection {
pub(crate) fn new(mut child: Child) -> PluginConnection {
let (tx_in, t_stdin) = if let Some(stdin) = child.stdin.take() {
let (tx, rx) = mpsc::channel();
let thr = thread::spawn(move || stdin_thread(stdin, rx));
(Some(tx), Some(thr))
} else {
(None, None)
};
let (rx_out, t_stdout) = if let Some(stdout) = child.stdout.take() {
let (tx, rx) = mpsc::channel();
let thr = thread::spawn(move || stdout_thread(stdout, tx));
(Some(rx), Some(thr))
} else {
(None, None)
};
let t_stderr = if let Some(stderr) = child.stderr.take() {
let thr = thread::spawn(move || stderr_thread(stderr));
Some(thr)
} else {
None
};
PluginConnection {
child,
tx_in,
rx_out,
t_stdin,
t_stdout,
t_stderr,
}
}
handshake_func!(plugin_info() -> PluginInfo, Request::PluginInfo, OkResponse::Map(info) => info.try_into());
handshake_func!(id_string_to_bytes(str: String) -> Vec<u8>, Request::IdToBytes(str), OkResponse::Bytes(bytes) => Ok(bytes));
handshake_func!(id_bytes_to_string(bytes: Vec<u8>) -> String, Request::IdToString(bytes), OkResponse::String(str) => Ok(str));
handshake_func!(settings() -> Vec<u8>, Request::Settings, OkResponse::Bytes(bytes) => Ok(bytes));
handshake_func!(id_size() -> usize, Request::IdSize, OkResponse::Usize(num) => Ok(num));
handshake_func!(block_size() -> u32, Request::BlockSize, OkResponse::U32(num) => Ok(num));
handshake_func!(open(settings: Vec<u8>) -> (), Request::Open(settings), OkResponse::Void => Ok(()));
handshake_func!(create(header: Vec<u8>, overwrite: bool) -> (), Request::Create(header, overwrite), OkResponse::Void => Ok(()));
handshake_func!(info() -> HashMap<String, String>, Request::Info, OkResponse::Map(map) => Ok(map));
handshake_func!(aquire(bytes: Vec<u8>) -> Vec<u8>, Request::Aquire(bytes), OkResponse::Bytes(bytes) => Ok(bytes));
handshake_func!(release(id: Vec<u8>) -> (), Request::Release(id), OkResponse::Void => Ok(()));
handshake_func!(read_header() -> Vec<u8>, Request::ReadHeader, OkResponse::Bytes(bytes) => Ok(bytes));
handshake_func!(write_header(bytes: Vec<u8>) -> (), Request::WriteHeader(bytes), OkResponse::Void => Ok(()));
handshake_func!(read(id: Vec<u8>) -> Vec<u8>, Request::Read(id), OkResponse::Bytes(bytes) => Ok(bytes));
handshake_func!(write(id: Vec<u8>, bytes: Vec<u8>) -> usize, Request::Write(id, bytes), OkResponse::Usize(num) => Ok(num));
handshake_func!(delete() -> (), Request::Delete, OkResponse::Void => Ok(()));
pub fn quit(&mut self) -> PluginResult<()> {
let response = match self.handshake(Request::Quit) {
Ok(response) => Ok(response),
Err(PluginError::ChannelClosed) => {
debug!("quit handshake failed with ChannelClosed, ignoring...");
Ok(Response::ok_void())
}
Err(err) => {
error!("failed quit message handshake: {}", err);
Err(err)
}
}?;
let result = match response {
Response::Ok(OkResponse::Void) => Ok(()),
Response::Ok(_) => Err(PluginError::InvalidResponse),
Response::Err(err) => Err(PluginError::Response(err)),
};
if result.is_err() {
self.shutdown();
}
result
}
fn handshake(&mut self, request: Request) -> PluginResult<Response> {
debug!("handshake requested for {:?}", request);
let tx = self.tx_in.as_mut().ok_or(PluginError::ChannelClosed)?;
let rx = self.rx_out.as_mut().ok_or(PluginError::ChannelClosed)?;
tx.send(request)?;
match rx.recv() {
Ok(response) => Ok(response),
Err(_) => {
self.shutdown();
Err(PluginError::ChannelClosed)
}
}
}
fn shutdown(&mut self) {
if let Some(tx_in) = self.tx_in.take() {
debug!("shutdown plugin connection");
drop(tx_in);
match self.child.wait() {
Ok(exit_status) => {
let level = match exit_status.success() {
true => Level::Debug,
false => Level::Error,
};
log!(level, "plugin exited with {}", exit_status);
}
Err(err) => error!("could not wait for plugin: {}", err),
};
join_thread!("stdin", self.t_stdin);
join_thread!("stdout", self.t_stdout);
join_thread!("stderr", self.t_stderr);
} else {
debug!("already shutdown");
}
}
}
impl Drop for PluginConnection {
fn drop(&mut self) {
self.shutdown()
}
}