nu_plugin_core/communication_mode/
mod.rs1use std::ffi::OsStr;
2use std::io::{Stdin, Stdout};
3use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
4
5use nu_protocol::ShellError;
6#[cfg(feature = "local-socket")] use nu_protocol::shell_error::io::IoError;
8
9#[cfg(feature = "local-socket")]
10mod local_socket;
11
12#[cfg(feature = "local-socket")]
13use local_socket::*;
14
15#[derive(Debug, Clone)]
23pub enum CommunicationMode {
24 Stdio,
26 #[cfg(feature = "local-socket")]
28 LocalSocket(std::ffi::OsString),
29}
30
31impl CommunicationMode {
32 #[cfg(feature = "local-socket")]
34 pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
35 use std::hash::{Hash, Hasher};
36 use std::time::SystemTime;
37
38 let mut hasher = std::collections::hash_map::DefaultHasher::new();
42
43 plugin_exe.hash(&mut hasher);
44 SystemTime::now().hash(&mut hasher);
45
46 let unique_id = format!("{:016x}", hasher.finish());
47
48 CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
49 }
50
51 pub fn args(&self) -> Vec<&OsStr> {
52 match self {
53 CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
54 #[cfg(feature = "local-socket")]
55 CommunicationMode::LocalSocket(path) => {
56 vec![OsStr::new("--local-socket"), path.as_os_str()]
57 }
58 }
59 }
60
61 pub fn setup_command_io(&self, command: &mut Command) {
62 match self {
63 CommunicationMode::Stdio => {
64 command.stdin(Stdio::piped());
66 command.stdout(Stdio::piped());
67 }
68 #[cfg(feature = "local-socket")]
69 CommunicationMode::LocalSocket(_) => {
70 command.stdin(Stdio::inherit());
73 command.stdout(Stdio::inherit());
74 }
75 }
76 }
77
78 pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
79 match self {
80 CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
82 #[cfg(feature = "local-socket")]
84 CommunicationMode::LocalSocket(name) => {
85 use interprocess::local_socket::ListenerOptions;
86
87 let listener = interpret_local_socket_name(name)
88 .and_then(|name| ListenerOptions::new().name(name).create_sync())
89 .map_err(|err| {
90 IoError::new_internal(
91 err,
92 format!(
93 "Could not interpret local socket name {:?}",
94 name.to_string_lossy()
95 ),
96 nu_protocol::location!(),
97 )
98 })?;
99 Ok(PreparedServerCommunication::LocalSocket { listener })
100 }
101 }
102 }
103
104 pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
105 match self {
106 CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
107 std::io::stdin(),
108 std::io::stdout(),
109 )),
110 #[cfg(feature = "local-socket")]
111 CommunicationMode::LocalSocket(name) => {
112 let get_socket = || {
114 use interprocess::local_socket as ls;
115 use ls::traits::Stream;
116
117 interpret_local_socket_name(name)
118 .and_then(|name| ls::Stream::connect(name))
119 .map_err(|err| {
120 ShellError::Io(IoError::new_internal(
121 err,
122 format!(
123 "Could not interpret local socket name {:?}",
124 name.to_string_lossy()
125 ),
126 nu_protocol::location!(),
127 ))
128 })
129 };
130 let read_in = get_socket()?;
132 let write_out = get_socket()?;
133 Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
134 }
135 }
136 }
137}
138
139pub enum PreparedServerCommunication {
145 Stdio,
147 #[cfg(feature = "local-socket")]
149 LocalSocket {
150 listener: interprocess::local_socket::Listener,
151 },
152}
153
154impl PreparedServerCommunication {
155 pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
156 match self {
157 PreparedServerCommunication::Stdio => {
158 let stdin = child
159 .stdin
160 .take()
161 .ok_or_else(|| ShellError::PluginFailedToLoad {
162 msg: "Plugin missing stdin writer".into(),
163 })?;
164
165 let stdout = child
166 .stdout
167 .take()
168 .ok_or_else(|| ShellError::PluginFailedToLoad {
169 msg: "Plugin missing stdout writer".into(),
170 })?;
171
172 Ok(ServerCommunicationIo::Stdio(stdin, stdout))
173 }
174 #[cfg(feature = "local-socket")]
175 PreparedServerCommunication::LocalSocket { listener, .. } => {
176 use interprocess::local_socket::traits::{
177 Listener, ListenerNonblockingMode, Stream,
178 };
179 use std::time::{Duration, Instant};
180
181 const RETRY_PERIOD: Duration = Duration::from_millis(1);
182 const TIMEOUT: Duration = Duration::from_secs(10);
183
184 let start = Instant::now();
185
186 listener
191 .set_nonblocking(ListenerNonblockingMode::Accept)
192 .map_err(|err| {
193 IoError::new_internal(
194 err,
195 "Could not set non-blocking mode accept for listener",
196 nu_protocol::location!(),
197 )
198 })?;
199 let mut get_socket = || {
200 let mut result = None;
201 while let Ok(None) = child.try_wait() {
202 match listener.accept() {
203 Ok(stream) => {
204 stream.set_nonblocking(false).map_err(|err| {
207 IoError::new_internal(
208 err,
209 "Could not disable non-blocking mode for listener",
210 nu_protocol::location!(),
211 )
212 })?;
213 result = Some(stream);
214 break;
215 }
216 Err(err) => {
217 if !is_would_block_err(&err) {
218 return Err(ShellError::Io(IoError::new_internal(
221 err,
222 "Accepting new data from listener failed",
223 nu_protocol::location!(),
224 )));
225 }
226 }
227 }
228 if Instant::now().saturating_duration_since(start) > TIMEOUT {
229 return Err(ShellError::PluginFailedToLoad {
230 msg: "Plugin timed out while waiting to connect to socket".into(),
231 });
232 } else {
233 std::thread::sleep(RETRY_PERIOD);
234 }
235 }
236 if let Some(stream) = result {
237 Ok(stream)
238 } else {
239 Err(ShellError::PluginFailedToLoad {
241 msg: "Plugin exited without connecting".into(),
242 })
243 }
244 };
245 let write_in = get_socket()?;
247 let read_out = get_socket()?;
248 Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
249 }
250 }
251 }
252}
253
254pub enum ServerCommunicationIo {
256 Stdio(ChildStdin, ChildStdout),
257 #[cfg(feature = "local-socket")]
258 LocalSocket {
259 read_out: interprocess::local_socket::Stream,
260 write_in: interprocess::local_socket::Stream,
261 },
262}
263
264pub enum ClientCommunicationIo {
266 Stdio(Stdin, Stdout),
267 #[cfg(feature = "local-socket")]
268 LocalSocket {
269 read_in: interprocess::local_socket::Stream,
270 write_out: interprocess::local_socket::Stream,
271 },
272}