use std::ffi::OsStr;
use std::io::{Stdin, Stdout};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use nu_protocol::ShellError;
#[cfg(feature = "local-socket")] use nu_protocol::shell_error::io::IoError;
#[cfg(feature = "local-socket")]
mod local_socket;
#[cfg(feature = "local-socket")]
use local_socket::*;
#[derive(Debug, Clone)]
pub enum CommunicationMode {
Stdio,
#[cfg(feature = "local-socket")]
LocalSocket(std::ffi::OsString),
}
impl CommunicationMode {
#[cfg(feature = "local-socket")]
pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
use std::hash::{Hash, Hasher};
use std::time::SystemTime;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
plugin_exe.hash(&mut hasher);
SystemTime::now().hash(&mut hasher);
let unique_id = format!("{:016x}", hasher.finish());
CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
}
pub fn args(&self) -> Vec<&OsStr> {
match self {
CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(path) => {
vec![OsStr::new("--local-socket"), path.as_os_str()]
}
}
}
pub fn setup_command_io(&self, command: &mut Command) {
match self {
CommunicationMode::Stdio => {
command.stdin(Stdio::piped());
command.stdout(Stdio::piped());
}
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(_) => {
command.stdin(Stdio::inherit());
command.stdout(Stdio::inherit());
}
}
}
pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
match self {
CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(name) => {
use interprocess::local_socket::ListenerOptions;
let listener = interpret_local_socket_name(name)
.and_then(|name| ListenerOptions::new().name(name).create_sync())
.map_err(|err| {
IoError::new_internal(
err,
format!(
"Could not interpret local socket name {:?}",
name.to_string_lossy()
),
)
})?;
Ok(PreparedServerCommunication::LocalSocket { listener })
}
}
}
pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
match self {
CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
std::io::stdin(),
std::io::stdout(),
)),
#[cfg(feature = "local-socket")]
CommunicationMode::LocalSocket(name) => {
let get_socket = || {
use interprocess::local_socket as ls;
use ls::traits::Stream;
interpret_local_socket_name(name)
.and_then(|name| ls::Stream::connect(name))
.map_err(|err| {
ShellError::Io(IoError::new_internal(
err,
format!(
"Could not interpret local socket name {:?}",
name.to_string_lossy()
),
))
})
};
let read_in = get_socket()?;
let write_out = get_socket()?;
Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
}
}
}
}
pub enum PreparedServerCommunication {
Stdio,
#[cfg(feature = "local-socket")]
LocalSocket {
listener: interprocess::local_socket::Listener,
},
}
impl PreparedServerCommunication {
pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
match self {
PreparedServerCommunication::Stdio => {
let stdin = child
.stdin
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdin writer".into(),
})?;
let stdout = child
.stdout
.take()
.ok_or_else(|| ShellError::PluginFailedToLoad {
msg: "Plugin missing stdout writer".into(),
})?;
Ok(ServerCommunicationIo::Stdio(stdin, stdout))
}
#[cfg(feature = "local-socket")]
PreparedServerCommunication::LocalSocket { listener, .. } => {
use interprocess::local_socket::ListenerNonblockingMode;
use interprocess::local_socket::traits::{Listener, Stream};
use nu_utils::time::Instant;
use std::time::Duration;
const RETRY_PERIOD: Duration = Duration::from_millis(1);
const TIMEOUT: Duration = Duration::from_secs(10);
let start = Instant::now();
listener
.set_nonblocking(ListenerNonblockingMode::Accept)
.map_err(|err| {
IoError::new_internal(
err,
"Could not set non-blocking mode accept for listener",
)
})?;
let mut get_socket = || {
let mut result = None;
while let Ok(None) = child.try_wait() {
match listener.accept() {
Ok(stream) => {
stream.set_nonblocking(false).map_err(|err| {
IoError::new_internal(
err,
"Could not disable non-blocking mode for listener",
)
})?;
result = Some(stream);
break;
}
Err(err) => {
if !is_would_block_err(&err) {
return Err(ShellError::Io(IoError::new_internal(
err,
"Accepting new data from listener failed",
)));
}
}
}
if Instant::now().saturating_duration_since(start) > TIMEOUT {
return Err(ShellError::PluginFailedToLoad {
msg: "Plugin timed out while waiting to connect to socket".into(),
});
} else {
std::thread::sleep(RETRY_PERIOD);
}
}
if let Some(stream) = result {
Ok(stream)
} else {
Err(ShellError::PluginFailedToLoad {
msg: "Plugin exited without connecting".into(),
})
}
};
let write_in = get_socket()?;
let read_out = get_socket()?;
Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
}
}
}
}
pub enum ServerCommunicationIo {
Stdio(ChildStdin, ChildStdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_out: interprocess::local_socket::Stream,
write_in: interprocess::local_socket::Stream,
},
}
pub enum ClientCommunicationIo {
Stdio(Stdin, Stdout),
#[cfg(feature = "local-socket")]
LocalSocket {
read_in: interprocess::local_socket::Stream,
write_out: interprocess::local_socket::Stream,
},
}