nu_plugin_core/communication_mode/
mod.rs

1use std::ffi::OsStr;
2use std::io::{Stdin, Stdout};
3use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
4
5use nu_protocol::ShellError;
6#[cfg(feature = "local-socket")] // unused without that feature
7use nu_protocol::shell_error::io::IoError;
8
9#[cfg(feature = "local-socket")]
10mod local_socket;
11
12#[cfg(feature = "local-socket")]
13use local_socket::*;
14
15/// The type of communication used between the plugin and the engine.
16///
17/// `Stdio` is required to be supported by all plugins, and is attempted initially. If the
18/// `local-socket` feature is enabled and the plugin supports it, `LocalSocket` may be attempted.
19///
20/// Local socket communication has the benefit of not tying up stdio, so it's more compatible with
21/// plugins that want to take user input from the terminal in some way.
22#[derive(Debug, Clone)]
23pub enum CommunicationMode {
24    /// Communicate using `stdin` and `stdout`.
25    Stdio,
26    /// Communicate using an operating system-specific local socket.
27    #[cfg(feature = "local-socket")]
28    LocalSocket(std::ffi::OsString),
29}
30
31impl CommunicationMode {
32    /// Generate a new local socket communication mode based on the given plugin exe path.
33    #[cfg(feature = "local-socket")]
34    pub fn local_socket(plugin_exe: &std::path::Path) -> CommunicationMode {
35        use std::hash::{Hash, Hasher};
36        use std::time::SystemTime;
37
38        // Generate the unique ID based on the plugin path and the current time. The actual
39        // algorithm here is not very important, we just want this to be relatively unique very
40        // briefly. Using the default hasher in the stdlib means zero extra dependencies.
41        let mut hasher = std::collections::hash_map::DefaultHasher::new();
42
43        plugin_exe.hash(&mut hasher);
44        SystemTime::now().hash(&mut hasher);
45
46        let unique_id = format!("{:016x}", hasher.finish());
47
48        CommunicationMode::LocalSocket(make_local_socket_name(&unique_id))
49    }
50
51    pub fn args(&self) -> Vec<&OsStr> {
52        match self {
53            CommunicationMode::Stdio => vec![OsStr::new("--stdio")],
54            #[cfg(feature = "local-socket")]
55            CommunicationMode::LocalSocket(path) => {
56                vec![OsStr::new("--local-socket"), path.as_os_str()]
57            }
58        }
59    }
60
61    pub fn setup_command_io(&self, command: &mut Command) {
62        match self {
63            CommunicationMode::Stdio => {
64                // Both stdout and stdin are piped so we can receive information from the plugin
65                command.stdin(Stdio::piped());
66                command.stdout(Stdio::piped());
67            }
68            #[cfg(feature = "local-socket")]
69            CommunicationMode::LocalSocket(_) => {
70                // Stdio can be used by the plugin to talk to the terminal in local socket mode,
71                // which is the big benefit
72                command.stdin(Stdio::inherit());
73                command.stdout(Stdio::inherit());
74            }
75        }
76    }
77
78    pub fn serve(&self) -> Result<PreparedServerCommunication, ShellError> {
79        match self {
80            // Nothing to set up for stdio - we just take it from the child.
81            CommunicationMode::Stdio => Ok(PreparedServerCommunication::Stdio),
82            // For sockets: we need to create the server so that the child won't fail to connect.
83            #[cfg(feature = "local-socket")]
84            CommunicationMode::LocalSocket(name) => {
85                use interprocess::local_socket::ListenerOptions;
86
87                let listener = interpret_local_socket_name(name)
88                    .and_then(|name| ListenerOptions::new().name(name).create_sync())
89                    .map_err(|err| {
90                        IoError::new_internal(
91                            err,
92                            format!(
93                                "Could not interpret local socket name {:?}",
94                                name.to_string_lossy()
95                            ),
96                            nu_protocol::location!(),
97                        )
98                    })?;
99                Ok(PreparedServerCommunication::LocalSocket { listener })
100            }
101        }
102    }
103
104    pub fn connect_as_client(&self) -> Result<ClientCommunicationIo, ShellError> {
105        match self {
106            CommunicationMode::Stdio => Ok(ClientCommunicationIo::Stdio(
107                std::io::stdin(),
108                std::io::stdout(),
109            )),
110            #[cfg(feature = "local-socket")]
111            CommunicationMode::LocalSocket(name) => {
112                // Connect to the specified socket.
113                let get_socket = || {
114                    use interprocess::local_socket as ls;
115                    use ls::traits::Stream;
116
117                    interpret_local_socket_name(name)
118                        .and_then(|name| ls::Stream::connect(name))
119                        .map_err(|err| {
120                            ShellError::Io(IoError::new_internal(
121                                err,
122                                format!(
123                                    "Could not interpret local socket name {:?}",
124                                    name.to_string_lossy()
125                                ),
126                                nu_protocol::location!(),
127                            ))
128                        })
129                };
130                // Reverse order from the server: read in, write out
131                let read_in = get_socket()?;
132                let write_out = get_socket()?;
133                Ok(ClientCommunicationIo::LocalSocket { read_in, write_out })
134            }
135        }
136    }
137}
138
139/// The result of [`CommunicationMode::serve()`], which acts as an intermediate stage for
140/// communication modes that require some kind of socket binding to occur before the client process
141/// can be started. Call [`.connect()`](Self::connect) once the client process has been started.
142///
143/// The socket may be cleaned up on `Drop` if applicable.
144pub enum PreparedServerCommunication {
145    /// Will take stdin and stdout from the process on [`.connect()`](Self::connect).
146    Stdio,
147    /// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`.
148    #[cfg(feature = "local-socket")]
149    LocalSocket {
150        listener: interprocess::local_socket::Listener,
151    },
152}
153
154impl PreparedServerCommunication {
155    pub fn connect(&self, child: &mut Child) -> Result<ServerCommunicationIo, ShellError> {
156        match self {
157            PreparedServerCommunication::Stdio => {
158                let stdin = child
159                    .stdin
160                    .take()
161                    .ok_or_else(|| ShellError::PluginFailedToLoad {
162                        msg: "Plugin missing stdin writer".into(),
163                    })?;
164
165                let stdout = child
166                    .stdout
167                    .take()
168                    .ok_or_else(|| ShellError::PluginFailedToLoad {
169                        msg: "Plugin missing stdout writer".into(),
170                    })?;
171
172                Ok(ServerCommunicationIo::Stdio(stdin, stdout))
173            }
174            #[cfg(feature = "local-socket")]
175            PreparedServerCommunication::LocalSocket { listener, .. } => {
176                use interprocess::local_socket::traits::{
177                    Listener, ListenerNonblockingMode, Stream,
178                };
179                use std::time::{Duration, Instant};
180
181                const RETRY_PERIOD: Duration = Duration::from_millis(1);
182                const TIMEOUT: Duration = Duration::from_secs(10);
183
184                let start = Instant::now();
185
186                // Use a loop to try to get two clients from the listener: one for read (the plugin
187                // output) and one for write (the plugin input)
188                //
189                // Be non-blocking on Accept only, so we can timeout.
190                listener
191                    .set_nonblocking(ListenerNonblockingMode::Accept)
192                    .map_err(|err| {
193                        IoError::new_internal(
194                            err,
195                            "Could not set non-blocking mode accept for listener",
196                            nu_protocol::location!(),
197                        )
198                    })?;
199                let mut get_socket = || {
200                    let mut result = None;
201                    while let Ok(None) = child.try_wait() {
202                        match listener.accept() {
203                            Ok(stream) => {
204                                // Success! Ensure the stream is in nonblocking mode though, for
205                                // good measure. Had an issue without this on macOS.
206                                stream.set_nonblocking(false).map_err(|err| {
207                                    IoError::new_internal(
208                                        err,
209                                        "Could not disable non-blocking mode for listener",
210                                        nu_protocol::location!(),
211                                    )
212                                })?;
213                                result = Some(stream);
214                                break;
215                            }
216                            Err(err) => {
217                                if !is_would_block_err(&err) {
218                                    // `WouldBlock` is ok, just means it's not ready yet, but some other
219                                    // kind of error should be reported
220                                    return Err(ShellError::Io(IoError::new_internal(
221                                        err,
222                                        "Accepting new data from listener failed",
223                                        nu_protocol::location!(),
224                                    )));
225                                }
226                            }
227                        }
228                        if Instant::now().saturating_duration_since(start) > TIMEOUT {
229                            return Err(ShellError::PluginFailedToLoad {
230                                msg: "Plugin timed out while waiting to connect to socket".into(),
231                            });
232                        } else {
233                            std::thread::sleep(RETRY_PERIOD);
234                        }
235                    }
236                    if let Some(stream) = result {
237                        Ok(stream)
238                    } else {
239                        // The process may have exited
240                        Err(ShellError::PluginFailedToLoad {
241                            msg: "Plugin exited without connecting".into(),
242                        })
243                    }
244                };
245                // Input stream always comes before output
246                let write_in = get_socket()?;
247                let read_out = get_socket()?;
248                Ok(ServerCommunicationIo::LocalSocket { read_out, write_in })
249            }
250        }
251    }
252}
253
254/// The required streams for communication from the engine side, i.e. the server in socket terms.
255pub enum ServerCommunicationIo {
256    Stdio(ChildStdin, ChildStdout),
257    #[cfg(feature = "local-socket")]
258    LocalSocket {
259        read_out: interprocess::local_socket::Stream,
260        write_in: interprocess::local_socket::Stream,
261    },
262}
263
264/// The required streams for communication from the plugin side, i.e. the client in socket terms.
265pub enum ClientCommunicationIo {
266    Stdio(Stdin, Stdout),
267    #[cfg(feature = "local-socket")]
268    LocalSocket {
269        read_in: interprocess::local_socket::Stream,
270        write_out: interprocess::local_socket::Stream,
271    },
272}