nu_plugin_core/communication_mode/
mod.rs1use std::ffi::OsStr;
2use std::io::{Stdin, Stdout};
3use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
4
5use nu_protocol::ShellError;
6#[cfg(feature = "local-socket")] use nu_protocol::shell_error::io::IoError;
8
9#[cfg(feature = "local-socket")]
10mod local_socket;
11
12#[cfg(feature = "local-socket")]
13use local_socket::*;
14
15#[derive(Debug, Clone)]
23pub enum CommunicationMode {
24 Stdio,
26 #[cfg(feature = "local-socket")]
28 LocalSocket(std::ffi::OsString),
29}
30
31impl CommunicationMode {
32 #[cfg(feature = "local-socket")]
34 pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
35 use std::hash::{Hash, Hasher};
36 use std::time::SystemTime;
37
38 let mut hasher = std::collections::hash_map::DefaultHasher::new();
42
43 plugin_exe.hash(&mut hasher);
44 SystemTime::now().hash(&mut hasher);
45
46 let unique_id = format!("{:016x}", hasher.finish());
47
48 CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
49 }
50
51 pub fn args(&self) -> Vec<&OsStr> {
52 match self {
53 CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
54 #[cfg(feature = "local-socket")]
55 CommunicationMode::LocalSocket(path) => {
56 vec![OsStr::new("--local-socket"), path.as_os_str()]
57 }
58 }
59 }
60
61 pub fn setup_command_io(&self, command: &mut Command) {
62 match self {
63 CommunicationMode::Stdio => {
64 command.stdin(Stdio::piped());
66 command.stdout(Stdio::piped());
67 }
68 #[cfg(feature = "local-socket")]
69 CommunicationMode::LocalSocket(_) => {
70 command.stdin(Stdio::inherit());
73 command.stdout(Stdio::inherit());
74 }
75 }
76 }
77
78 pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
79 match self {
80 CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
82 #[cfg(feature = "local-socket")]
84 CommunicationMode::LocalSocket(name) => {
85 use interprocess::local_socket::ListenerOptions;
86
87 let listener = interpret_local_socket_name(name)
88 .and_then(|name| ListenerOptions::new().name(name).create_sync())
89 .map_err(|err| {
90 IoError::new_internal(
91 err,
92 format!(
93 "Could not interpret local socket name {:?}",
94 name.to_string_lossy()
95 ),
96 )
97 })?;
98 Ok(PreparedServerCommunication::LocalSocket { listener })
99 }
100 }
101 }
102
103 pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
104 match self {
105 CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
106 std::io::stdin(),
107 std::io::stdout(),
108 )),
109 #[cfg(feature = "local-socket")]
110 CommunicationMode::LocalSocket(name) => {
111 let get_socket = || {
113 use interprocess::local_socket as ls;
114 use ls::traits::Stream;
115
116 interpret_local_socket_name(name)
117 .and_then(|name| ls::Stream::connect(name))
118 .map_err(|err| {
119 ShellError::Io(IoError::new_internal(
120 err,
121 format!(
122 "Could not interpret local socket name {:?}",
123 name.to_string_lossy()
124 ),
125 ))
126 })
127 };
128 let read_in = get_socket()?;
130 let write_out = get_socket()?;
131 Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
132 }
133 }
134 }
135}
136
137pub enum PreparedServerCommunication {
143 Stdio,
145 #[cfg(feature = "local-socket")]
147 LocalSocket {
148 listener: interprocess::local_socket::Listener,
149 },
150}
151
152impl PreparedServerCommunication {
153 pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
154 match self {
155 PreparedServerCommunication::Stdio => {
156 let stdin = child
157 .stdin
158 .take()
159 .ok_or_else(|| ShellError::PluginFailedToLoad {
160 msg: "Plugin missing stdin writer".into(),
161 })?;
162
163 let stdout = child
164 .stdout
165 .take()
166 .ok_or_else(|| ShellError::PluginFailedToLoad {
167 msg: "Plugin missing stdout writer".into(),
168 })?;
169
170 Ok(ServerCommunicationIo::Stdio(stdin, stdout))
171 }
172 #[cfg(feature = "local-socket")]
173 PreparedServerCommunication::LocalSocket { listener, .. } => {
174 use interprocess::local_socket::ListenerNonblockingMode;
175 use interprocess::local_socket::traits::{Listener, Stream};
176 use nu_utils::time::Instant;
177 use std::time::Duration;
178
179 const RETRY_PERIOD: Duration = Duration::from_millis(1);
180 const TIMEOUT: Duration = Duration::from_secs(10);
181
182 let start = Instant::now();
183
184 listener
189 .set_nonblocking(ListenerNonblockingMode::Accept)
190 .map_err(|err| {
191 IoError::new_internal(
192 err,
193 "Could not set non-blocking mode accept for listener",
194 )
195 })?;
196 let mut get_socket = || {
197 let mut result = None;
198 while let Ok(None) = child.try_wait() {
199 match listener.accept() {
200 Ok(stream) => {
201 stream.set_nonblocking(false).map_err(|err| {
204 IoError::new_internal(
205 err,
206 "Could not disable non-blocking mode for listener",
207 )
208 })?;
209 result = Some(stream);
210 break;
211 }
212 Err(err) => {
213 if !is_would_block_err(&err) {
214 return Err(ShellError::Io(IoError::new_internal(
217 err,
218 "Accepting new data from listener failed",
219 )));
220 }
221 }
222 }
223 if Instant::now().saturating_duration_since(start) > TIMEOUT {
224 return Err(ShellError::PluginFailedToLoad {
225 msg: "Plugin timed out while waiting to connect to socket".into(),
226 });
227 } else {
228 std::thread::sleep(RETRY_PERIOD);
229 }
230 }
231 if let Some(stream) = result {
232 Ok(stream)
233 } else {
234 Err(ShellError::PluginFailedToLoad {
236 msg: "Plugin exited without connecting".into(),
237 })
238 }
239 };
240 let write_in = get_socket()?;
242 let read_out = get_socket()?;
243 Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
244 }
245 }
246 }
247}
248
249pub enum ServerCommunicationIo {
251 Stdio(ChildStdin, ChildStdout),
252 #[cfg(feature = "local-socket")]
253 LocalSocket {
254 read_out: interprocess::local_socket::Stream,
255 write_in: interprocess::local_socket::Stream,
256 },
257}
258
259pub enum ClientCommunicationIo {
261 Stdio(Stdin, Stdout),
262 #[cfg(feature = "local-socket")]
263 LocalSocket {
264 read_in: interprocess::local_socket::Stream,
265 write_out: interprocess::local_socket::Stream,
266 },
267}