nu_plugin_core/communication_mode/
mod.rs1use std::ffi::OsStr;
2use std::io::{Stdin, Stdout};
3use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
4
5use nu_protocol::ShellError;
6use nu_protocol::shell_error::io::IoError;
7
8#[cfg(feature = "local-socket")]
9mod local_socket;
10
11#[cfg(feature = "local-socket")]
12use local_socket::*;
13
14#[derive(Debug, Clone)]
22pub enum CommunicationMode {
23 Stdio,
25 #[cfg(feature = "local-socket")]
27 LocalSocket(std::ffi::OsString),
28}
29
30impl CommunicationMode {
31 #[cfg(feature = "local-socket")]
33 pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
34 use std::hash::{Hash, Hasher};
35 use std::time::SystemTime;
36
37 let mut hasher = std::collections::hash_map::DefaultHasher::new();
41
42 plugin_exe.hash(&mut hasher);
43 SystemTime::now().hash(&mut hasher);
44
45 let unique_id = format!("{:016x}", hasher.finish());
46
47 CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
48 }
49
50 pub fn args(&self) -> Vec<&OsStr> {
51 match self {
52 CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
53 #[cfg(feature = "local-socket")]
54 CommunicationMode::LocalSocket(path) => {
55 vec![OsStr::new("--local-socket"), path.as_os_str()]
56 }
57 }
58 }
59
60 pub fn setup_command_io(&self, command: &mut Command) {
61 match self {
62 CommunicationMode::Stdio => {
63 command.stdin(Stdio::piped());
65 command.stdout(Stdio::piped());
66 }
67 #[cfg(feature = "local-socket")]
68 CommunicationMode::LocalSocket(_) => {
69 command.stdin(Stdio::inherit());
72 command.stdout(Stdio::inherit());
73 }
74 }
75 }
76
77 pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
78 match self {
79 CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
81 #[cfg(feature = "local-socket")]
83 CommunicationMode::LocalSocket(name) => {
84 use interprocess::local_socket::ListenerOptions;
85
86 let listener = interpret_local_socket_name(name)
87 .and_then(|name| ListenerOptions::new().name(name).create_sync())
88 .map_err(|err| {
89 IoError::new_internal(
90 err,
91 format!(
92 "Could not interpret local socket name {:?}",
93 name.to_string_lossy()
94 ),
95 nu_protocol::location!(),
96 )
97 })?;
98 Ok(PreparedServerCommunication::LocalSocket { listener })
99 }
100 }
101 }
102
103 pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
104 match self {
105 CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
106 std::io::stdin(),
107 std::io::stdout(),
108 )),
109 #[cfg(feature = "local-socket")]
110 CommunicationMode::LocalSocket(name) => {
111 let get_socket = || {
113 use interprocess::local_socket as ls;
114 use ls::traits::Stream;
115
116 interpret_local_socket_name(name)
117 .and_then(|name| ls::Stream::connect(name))
118 .map_err(|err| {
119 ShellError::Io(IoError::new_internal(
120 err,
121 format!(
122 "Could not interpret local socket name {:?}",
123 name.to_string_lossy()
124 ),
125 nu_protocol::location!(),
126 ))
127 })
128 };
129 let read_in = get_socket()?;
131 let write_out = get_socket()?;
132 Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
133 }
134 }
135 }
136}
137
138pub enum PreparedServerCommunication {
144 Stdio,
146 #[cfg(feature = "local-socket")]
148 LocalSocket {
149 listener: interprocess::local_socket::Listener,
150 },
151}
152
153impl PreparedServerCommunication {
154 pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
155 match self {
156 PreparedServerCommunication::Stdio => {
157 let stdin = child
158 .stdin
159 .take()
160 .ok_or_else(|| ShellError::PluginFailedToLoad {
161 msg: "Plugin missing stdin writer".into(),
162 })?;
163
164 let stdout = child
165 .stdout
166 .take()
167 .ok_or_else(|| ShellError::PluginFailedToLoad {
168 msg: "Plugin missing stdout writer".into(),
169 })?;
170
171 Ok(ServerCommunicationIo::Stdio(stdin, stdout))
172 }
173 #[cfg(feature = "local-socket")]
174 PreparedServerCommunication::LocalSocket { listener, .. } => {
175 use interprocess::local_socket::traits::{
176 Listener, ListenerNonblockingMode, Stream,
177 };
178 use std::time::{Duration, Instant};
179
180 const RETRY_PERIOD: Duration = Duration::from_millis(1);
181 const TIMEOUT: Duration = Duration::from_secs(10);
182
183 let start = Instant::now();
184
185 listener
190 .set_nonblocking(ListenerNonblockingMode::Accept)
191 .map_err(|err| {
192 IoError::new_internal(
193 err,
194 "Could not set non-blocking mode accept for listener",
195 nu_protocol::location!(),
196 )
197 })?;
198 let mut get_socket = || {
199 let mut result = None;
200 while let Ok(None) = child.try_wait() {
201 match listener.accept() {
202 Ok(stream) => {
203 stream.set_nonblocking(false).map_err(|err| {
206 IoError::new_internal(
207 err,
208 "Could not disable non-blocking mode for listener",
209 nu_protocol::location!(),
210 )
211 })?;
212 result = Some(stream);
213 break;
214 }
215 Err(err) => {
216 if !is_would_block_err(&err) {
217 return Err(ShellError::Io(IoError::new_internal(
220 err,
221 "Accepting new data from listener failed",
222 nu_protocol::location!(),
223 )));
224 }
225 }
226 }
227 if Instant::now().saturating_duration_since(start) > TIMEOUT {
228 return Err(ShellError::PluginFailedToLoad {
229 msg: "Plugin timed out while waiting to connect to socket".into(),
230 });
231 } else {
232 std::thread::sleep(RETRY_PERIOD);
233 }
234 }
235 if let Some(stream) = result {
236 Ok(stream)
237 } else {
238 Err(ShellError::PluginFailedToLoad {
240 msg: "Plugin exited without connecting".into(),
241 })
242 }
243 };
244 let write_in = get_socket()?;
246 let read_out = get_socket()?;
247 Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
248 }
249 }
250 }
251}
252
253pub enum ServerCommunicationIo {
255 Stdio(ChildStdin, ChildStdout),
256 #[cfg(feature = "local-socket")]
257 LocalSocket {
258 read_out: interprocess::local_socket::Stream,
259 write_in: interprocess::local_socket::Stream,
260 },
261}
262
263pub enum ClientCommunicationIo {
265 Stdio(Stdin, Stdout),
266 #[cfg(feature = "local-socket")]
267 LocalSocket {
268 read_in: interprocess::local_socket::Stream,
269 write_out: interprocess::local_socket::Stream,
270 },
271}