nu_plugin_core/communication_mode/
mod.rs

1use std::ffi::OsStr;
2use std::io::{Stdin, Stdout};
3use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
4
5use nu_protocol::ShellError;
6use nu_protocol::shell_error::io::IoError;
7
8#[cfg(feature = "local-socket")]
9mod local_socket;
10
11#[cfg(feature = "local-socket")]
12use local_socket::*;
13
14/// The type of communication used between the plugin and the engine.
15///
16/// `Stdio` is required to be supported by all plugins, and is attempted initially. If the
17/// `local-socket` feature is enabled and the plugin supports it, `LocalSocket` may be attempted.
18///
19/// Local socket communication has the benefit of not tying up stdio, so it's more compatible with
20/// plugins that want to take user input from the terminal in some way.
21#[derive(Debug, Clone)]
22pub enum CommunicationMode {
23    /// Communicate using `stdin` and `stdout`.
24    Stdio,
25    /// Communicate using an operating system-specific local socket.
26    #[cfg(feature = "local-socket")]
27    LocalSocket(std::ffi::OsString),
28}
29
30impl CommunicationMode {
31    /// Generate a new local socket communication mode based on the given plugin exe path.
32    #[cfg(feature = "local-socket")]
33    pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
34        use std::hash::{Hash, Hasher};
35        use std::time::SystemTime;
36
37        // Generate the unique ID based on the plugin path and the current time. The actual
38        // algorithm here is not very important, we just want this to be relatively unique very
39        // briefly. Using the default hasher in the stdlib means zero extra dependencies.
40        let mut hasher = std::collections::hash_map::DefaultHasher::new();
41
42        plugin_exe.hash(&mut hasher);
43        SystemTime::now().hash(&mut hasher);
44
45        let unique_id = format!("{:016x}", hasher.finish());
46
47        CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
48    }
49
50    pub fn args(&self) -> Vec<&OsStr> {
51        match self {
52            CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
53            #[cfg(feature = "local-socket")]
54            CommunicationMode::LocalSocket(path) => {
55                vec![OsStr::new("--local-socket"), path.as_os_str()]
56            }
57        }
58    }
59
60    pub fn setup_command_io(&self, command: &mut Command) {
61        match self {
62            CommunicationMode::Stdio => {
63                // Both stdout and stdin are piped so we can receive information from the plugin
64                command.stdin(Stdio::piped());
65                command.stdout(Stdio::piped());
66            }
67            #[cfg(feature = "local-socket")]
68            CommunicationMode::LocalSocket(_) => {
69                // Stdio can be used by the plugin to talk to the terminal in local socket mode,
70                // which is the big benefit
71                command.stdin(Stdio::inherit());
72                command.stdout(Stdio::inherit());
73            }
74        }
75    }
76
77    pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
78        match self {
79            // Nothing to set up for stdio - we just take it from the child.
80            CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
81            // For sockets: we need to create the server so that the child won't fail to connect.
82            #[cfg(feature = "local-socket")]
83            CommunicationMode::LocalSocket(name) => {
84                use interprocess::local_socket::ListenerOptions;
85
86                let listener = interpret_local_socket_name(name)
87                    .and_then(|name| ListenerOptions::new().name(name).create_sync())
88                    .map_err(|err| {
89                        IoError::new_internal(
90                            err,
91                            format!(
92                                "Could not interpret local socket name {:?}",
93                                name.to_string_lossy()
94                            ),
95                            nu_protocol::location!(),
96                        )
97                    })?;
98                Ok(PreparedServerCommunication::LocalSocket { listener })
99            }
100        }
101    }
102
103    pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
104        match self {
105            CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
106                std::io::stdin(),
107                std::io::stdout(),
108            )),
109            #[cfg(feature = "local-socket")]
110            CommunicationMode::LocalSocket(name) => {
111                // Connect to the specified socket.
112                let get_socket = || {
113                    use interprocess::local_socket as ls;
114                    use ls::traits::Stream;
115
116                    interpret_local_socket_name(name)
117                        .and_then(|name| ls::Stream::connect(name))
118                        .map_err(|err| {
119                            ShellError::Io(IoError::new_internal(
120                                err,
121                                format!(
122                                    "Could not interpret local socket name {:?}",
123                                    name.to_string_lossy()
124                                ),
125                                nu_protocol::location!(),
126                            ))
127                        })
128                };
129                // Reverse order from the server: read in, write out
130                let read_in = get_socket()?;
131                let write_out = get_socket()?;
132                Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
133            }
134        }
135    }
136}
137
138/// The result of [`CommunicationMode::serve()`], which acts as an intermediate stage for
139/// communication modes that require some kind of socket binding to occur before the client process
140/// can be started. Call [`.connect()`](Self::connect) once the client process has been started.
141///
142/// The socket may be cleaned up on `Drop` if applicable.
143pub enum PreparedServerCommunication {
144    /// Will take stdin and stdout from the process on [`.connect()`](Self::connect).
145    Stdio,
146    /// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`.
147    #[cfg(feature = "local-socket")]
148    LocalSocket {
149        listener: interprocess::local_socket::Listener,
150    },
151}
152
153impl PreparedServerCommunication {
154    pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
155        match self {
156            PreparedServerCommunication::Stdio => {
157                let stdin = child
158                    .stdin
159                    .take()
160                    .ok_or_else(|| ShellError::PluginFailedToLoad {
161                        msg: "Plugin missing stdin writer".into(),
162                    })?;
163
164                let stdout = child
165                    .stdout
166                    .take()
167                    .ok_or_else(|| ShellError::PluginFailedToLoad {
168                        msg: "Plugin missing stdout writer".into(),
169                    })?;
170
171                Ok(ServerCommunicationIo::Stdio(stdin, stdout))
172            }
173            #[cfg(feature = "local-socket")]
174            PreparedServerCommunication::LocalSocket { listener, .. } => {
175                use interprocess::local_socket::traits::{
176                    Listener, ListenerNonblockingMode, Stream,
177                };
178                use std::time::{Duration, Instant};
179
180                const RETRY_PERIOD: Duration = Duration::from_millis(1);
181                const TIMEOUT: Duration = Duration::from_secs(10);
182
183                let start = Instant::now();
184
185                // Use a loop to try to get two clients from the listener: one for read (the plugin
186                // output) and one for write (the plugin input)
187                //
188                // Be non-blocking on Accept only, so we can timeout.
189                listener
190                    .set_nonblocking(ListenerNonblockingMode::Accept)
191                    .map_err(|err| {
192                        IoError::new_internal(
193                            err,
194                            "Could not set non-blocking mode accept for listener",
195                            nu_protocol::location!(),
196                        )
197                    })?;
198                let mut get_socket = || {
199                    let mut result = None;
200                    while let Ok(None) = child.try_wait() {
201                        match listener.accept() {
202                            Ok(stream) => {
203                                // Success! Ensure the stream is in nonblocking mode though, for
204                                // good measure. Had an issue without this on macOS.
205                                stream.set_nonblocking(false).map_err(|err| {
206                                    IoError::new_internal(
207                                        err,
208                                        "Could not disable non-blocking mode for listener",
209                                        nu_protocol::location!(),
210                                    )
211                                })?;
212                                result = Some(stream);
213                                break;
214                            }
215                            Err(err) => {
216                                if !is_would_block_err(&err) {
217                                    // `WouldBlock` is ok, just means it's not ready yet, but some other
218                                    // kind of error should be reported
219                                    return Err(ShellError::Io(IoError::new_internal(
220                                        err,
221                                        "Accepting new data from listener failed",
222                                        nu_protocol::location!(),
223                                    )));
224                                }
225                            }
226                        }
227                        if Instant::now().saturating_duration_since(start) > TIMEOUT {
228                            return Err(ShellError::PluginFailedToLoad {
229                                msg: "Plugin timed out while waiting to connect to socket".into(),
230                            });
231                        } else {
232                            std::thread::sleep(RETRY_PERIOD);
233                        }
234                    }
235                    if let Some(stream) = result {
236                        Ok(stream)
237                    } else {
238                        // The process may have exited
239                        Err(ShellError::PluginFailedToLoad {
240                            msg: "Plugin exited without connecting".into(),
241                        })
242                    }
243                };
244                // Input stream always comes before output
245                let write_in = get_socket()?;
246                let read_out = get_socket()?;
247                Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
248            }
249        }
250    }
251}
252
253/// The required streams for communication from the engine side, i.e. the server in socket terms.
254pub enum ServerCommunicationIo {
255    Stdio(ChildStdin, ChildStdout),
256    #[cfg(feature = "local-socket")]
257    LocalSocket {
258        read_out: interprocess::local_socket::Stream,
259        write_in: interprocess::local_socket::Stream,
260    },
261}
262
263/// The required streams for communication from the plugin side, i.e. the client in socket terms.
264pub enum ClientCommunicationIo {
265    Stdio(Stdin, Stdout),
266    #[cfg(feature = "local-socket")]
267    LocalSocket {
268        read_in: interprocess::local_socket::Stream,
269        write_out: interprocess::local_socket::Stream,
270    },
271}