Skip to main content

nu_plugin_engine/
init.rs

1use std::{
2    io::{BufReader, BufWriter},
3    path::Path,
4    process::Child,
5    sync::{Arc, Mutex},
6};
7
8#[cfg(unix)]
9use std::os::unix::process::CommandExt;
10#[cfg(windows)]
11use std::os::windows::process::CommandExt;
12
13use nu_plugin_core::{
14    CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
15    ServerCommunicationIo,
16};
17use nu_protocol::{
18    PluginIdentity, PluginRegistryFile, PluginRegistryItem, PluginRegistryItemData,
19    RegisteredPlugin, ShellError, Span, engine::StateWorkingSet, report_shell_error,
20    shell_error::generic::GenericError,
21};
22
23use crate::{
24    PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
25    PluginSource,
26};
27
28/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
29///
30/// The buffers coming from byte streams are typically each 8192 bytes, so double that.
31pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
32
33/// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to
34/// [`make_plugin_interface()`] to get a [`PluginInterface`].
35pub fn create_command(
36    path: &Path,
37    mut shell: Option<&Path>,
38    mode: &CommunicationMode,
39) -> std::process::Command {
40    log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");
41
42    let mut shell_args = vec![];
43
44    if shell.is_none() {
45        // We only have to do this for things that are not executable by Rust's Command API on
46        // Windows. They do handle bat/cmd files for us, helpfully.
47        //
48        // Also include anything that wouldn't be executable with a shebang, like JAR files.
49        shell = match path.extension().and_then(|e| e.to_str()) {
50            Some("sh") => {
51                if cfg!(unix) {
52                    // We don't want to override what might be in the shebang if this is Unix, since
53                    // some scripts will have a shebang specifying bash even if they're .sh
54                    None
55                } else {
56                    Some(Path::new("sh"))
57                }
58            }
59            Some("nu") => {
60                shell_args.push("--stdin");
61                Some(Path::new("nu"))
62            }
63            Some("py") => Some(Path::new("python")),
64            Some("rb") => Some(Path::new("ruby")),
65            Some("jar") => {
66                shell_args.push("-jar");
67                Some(Path::new("java"))
68            }
69            _ => None,
70        };
71    }
72
73    let mut process = if let Some(shell) = shell {
74        let mut process = std::process::Command::new(shell);
75        process.args(shell_args);
76        process.arg(path);
77
78        process
79    } else {
80        std::process::Command::new(path)
81    };
82
83    process.args(mode.args());
84
85    // Setup I/O according to the communication mode
86    mode.setup_command_io(&mut process);
87
88    // The plugin should be run in a new process group to prevent Ctrl-C from stopping it
89    #[cfg(unix)]
90    process.process_group(0);
91    #[cfg(windows)]
92    process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
93
94    // In order to make bugs with improper use of filesystem without getting the engine current
95    // directory more obvious, the plugin always starts in the directory of its executable
96    if let Some(dirname) = path.parent() {
97        process.current_dir(dirname);
98    }
99
100    process
101}
102
103/// Create a plugin interface from a spawned child process.
104///
105/// `comm` determines the communication type the process was spawned with, and whether stdio will
106/// be taken from the child.
107pub fn make_plugin_interface(
108    mut child: Child,
109    comm: PreparedServerCommunication,
110    source: Arc<PluginSource>,
111    pid: Option<u32>,
112    gc: Option<PluginGc>,
113) -> Result<PluginInterface, ShellError> {
114    match comm.connect(&mut child)? {
115        ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
116            stdout,
117            stdin,
118            move || {
119                let _ = child.wait();
120            },
121            source,
122            pid,
123            gc,
124        ),
125        #[cfg(feature = "local-socket")]
126        ServerCommunicationIo::LocalSocket { read_out, write_in } => {
127            make_plugin_interface_with_streams(
128                read_out,
129                write_in,
130                move || {
131                    let _ = child.wait();
132                },
133                source,
134                pid,
135                gc,
136            )
137        }
138    }
139}
140
141/// Create a plugin interface from low-level components.
142///
143/// - `after_close` is called to clean up after the `reader` ends.
144/// - `source` is required so that custom values produced by the plugin can spawn it.
145/// - `pid` may be provided for process management (e.g. `EnterForeground`).
146/// - `gc` may be provided for communication with the plugin's GC (e.g. `SetGcDisabled`).
147pub fn make_plugin_interface_with_streams(
148    mut reader: impl std::io::Read + Send + 'static,
149    writer: impl std::io::Write + Send + 'static,
150    after_close: impl FnOnce() + Send + 'static,
151    source: Arc<PluginSource>,
152    pid: Option<u32>,
153    gc: Option<PluginGc>,
154) -> Result<PluginInterface, ShellError> {
155    let encoder = get_plugin_encoding(&mut reader)?;
156
157    let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
158    let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);
159
160    let mut manager =
161        PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
162    manager.set_garbage_collector(gc);
163
164    let interface = manager.get_interface();
165    interface.hello()?;
166
167    // Spawn the reader on a new thread. We need to be able to read messages at the same time that
168    // we write, because we are expected to be able to handle multiple messages coming in from the
169    // plugin at any time, including stream messages like `Drop`.
170    std::thread::Builder::new()
171        .name(format!(
172            "plugin interface reader ({})",
173            source.identity.name()
174        ))
175        .spawn(move || {
176            if let Err(err) = manager.consume_all((reader, encoder)) {
177                log::warn!("Error in PluginInterfaceManager: {err}");
178            }
179            // If the loop has ended, drop the manager so everyone disconnects and then run
180            // after_close
181            drop(manager);
182            after_close();
183        })
184        .map_err(|err| ShellError::PluginFailedToLoad {
185            msg: format!("Failed to spawn thread for plugin: {err}"),
186        })?;
187
188    Ok(interface)
189}
190
191/// Determine the plugin's encoding from a freshly opened stream.
192///
193/// The plugin is expected to send a 1-byte length and either `json` or `msgpack`, so this reads
194/// that and determines the right length.
195pub fn get_plugin_encoding(
196    child_stdout: &mut impl std::io::Read,
197) -> Result<EncodingType, ShellError> {
198    let mut length_buf = [0u8; 1];
199    child_stdout
200        .read_exact(&mut length_buf)
201        .map_err(|e| ShellError::PluginFailedToLoad {
202            msg: format!("unable to get encoding from plugin: {e}"),
203        })?;
204
205    let mut buf = vec![0u8; length_buf[0] as usize];
206    child_stdout
207        .read_exact(&mut buf)
208        .map_err(|e| ShellError::PluginFailedToLoad {
209            msg: format!("unable to get encoding from plugin: {e}"),
210        })?;
211
212    EncodingType::try_from_bytes(&buf).ok_or_else(|| {
213        let encoding_for_debug = String::from_utf8_lossy(&buf);
214        ShellError::PluginFailedToLoad {
215            msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
216        }
217    })
218}
219
220/// Load the definitions from the plugin file into the engine state
221pub fn load_plugin_file(
222    working_set: &mut StateWorkingSet,
223    plugin_registry_file: &PluginRegistryFile,
224    span: Option<Span>,
225) -> usize {
226    let mut error_count = 0;
227
228    for plugin in &plugin_registry_file.plugins {
229        // Any errors encountered should just be logged.
230        if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
231            error_count += 1;
232            report_shell_error(None, working_set.permanent_state, &err)
233        }
234    }
235
236    error_count
237}
238
239/// Load a definition from the plugin file into the engine state
240pub fn load_plugin_registry_item(
241    working_set: &mut StateWorkingSet,
242    plugin: &PluginRegistryItem,
243    span: Option<Span>,
244) -> Result<Arc<PersistentPlugin>, ShellError> {
245    let identity =
246        PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
247            let help = format!(
248                "the filename for `{}` is not a valid nushell plugin: {}",
249                plugin.name,
250                plugin.filename.display()
251            );
252            let error = if let Some(span) = span {
253                GenericError::new(
254                    "Invalid plugin filename in plugin registry file",
255                    "loaded from here",
256                    span,
257                )
258                .with_help(help)
259            } else {
260                GenericError::new_internal(
261                    "Invalid plugin filename in plugin registry file",
262                    "loaded from here",
263                )
264                .with_help(help)
265            };
266            ShellError::Generic(error)
267        })?;
268
269    match &plugin.data {
270        PluginRegistryItemData::Valid { metadata, commands } => {
271            let plugin = add_plugin_to_working_set(working_set, &identity)?;
272
273            // Ensure that the plugin is reset. We're going to load new signatures, so we want to
274            // make sure the running plugin reflects those new signatures, and it's possible that it
275            // doesn't.
276            plugin.reset()?;
277
278            // Set the plugin metadata from the file
279            plugin.set_metadata(Some(metadata.clone()));
280
281            // Create the declarations from the commands
282            for signature in commands {
283                let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
284                working_set.add_decl(Box::new(decl));
285            }
286            Ok(plugin)
287        }
288        PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
289            plugin_name: identity.name().to_owned(),
290            span,
291            add_command: identity.add_command(),
292        }),
293    }
294}
295
296/// Find [`PersistentPlugin`] with the given `identity` in the `working_set`, or construct it
297/// if it doesn't exist.
298///
299/// The garbage collection config is always found and set in either case.
300pub fn add_plugin_to_working_set(
301    working_set: &mut StateWorkingSet,
302    identity: &PluginIdentity,
303) -> Result<Arc<PersistentPlugin>, ShellError> {
304    // Find garbage collection config for the plugin
305    let gc_config = working_set
306        .get_config()
307        .plugin_gc
308        .get(identity.name())
309        .clone();
310
311    // Add it to / get it from the working set
312    let plugin = working_set.find_or_create_plugin(identity, || {
313        Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
314    });
315
316    plugin.set_gc_config(&gc_config);
317
318    // Downcast the plugin to `PersistentPlugin` - we generally expect this to succeed.
319    // The trait object only exists so that nu-protocol can contain plugins without knowing
320    // anything about their implementation, but we only use `PersistentPlugin` in practice.
321    plugin
322        .as_any()
323        .downcast()
324        .map_err(|_| ShellError::NushellFailed {
325            msg: "encountered unexpected RegisteredPlugin type".into(),
326        })
327}