nu_plugin_engine/
init.rs

1use std::{
2    io::{BufReader, BufWriter},
3    path::Path,
4    process::Child,
5    sync::{Arc, Mutex},
6};
7
8#[cfg(unix)]
9use std::os::unix::process::CommandExt;
10#[cfg(windows)]
11use std::os::windows::process::CommandExt;
12
13use nu_plugin_core::{
14    CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
15    ServerCommunicationIo,
16};
17use nu_protocol::{
18    PluginIdentity, PluginRegistryFile, PluginRegistryItem, PluginRegistryItemData,
19    RegisteredPlugin, ShellError, Span, engine::StateWorkingSet, report_shell_error,
20};
21
22use crate::{
23    PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
24    PluginSource,
25};
26
27/// This should be larger than the largest commonly sent message to avoid excessive fragmentation.
28///
29/// The buffers coming from byte streams are typically each 8192 bytes, so double that.
30pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
31
32/// Spawn the command for a plugin, in the given `mode`. After spawning, it can be passed to
33/// [`make_plugin_interface()`] to get a [`PluginInterface`].
34pub fn create_command(
35    path: &Path,
36    mut shell: Option<&Path>,
37    mode: &CommunicationMode,
38) -> std::process::Command {
39    log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");
40
41    let mut shell_args = vec![];
42
43    if shell.is_none() {
44        // We only have to do this for things that are not executable by Rust's Command API on
45        // Windows. They do handle bat/cmd files for us, helpfully.
46        //
47        // Also include anything that wouldn't be executable with a shebang, like JAR files.
48        shell = match path.extension().and_then(|e| e.to_str()) {
49            Some("sh") => {
50                if cfg!(unix) {
51                    // We don't want to override what might be in the shebang if this is Unix, since
52                    // some scripts will have a shebang specifying bash even if they're .sh
53                    None
54                } else {
55                    Some(Path::new("sh"))
56                }
57            }
58            Some("nu") => {
59                shell_args.push("--stdin");
60                Some(Path::new("nu"))
61            }
62            Some("py") => Some(Path::new("python")),
63            Some("rb") => Some(Path::new("ruby")),
64            Some("jar") => {
65                shell_args.push("-jar");
66                Some(Path::new("java"))
67            }
68            _ => None,
69        };
70    }
71
72    let mut process = if let Some(shell) = shell {
73        let mut process = std::process::Command::new(shell);
74        process.args(shell_args);
75        process.arg(path);
76
77        process
78    } else {
79        std::process::Command::new(path)
80    };
81
82    process.args(mode.args());
83
84    // Setup I/O according to the communication mode
85    mode.setup_command_io(&mut process);
86
87    // The plugin should be run in a new process group to prevent Ctrl-C from stopping it
88    #[cfg(unix)]
89    process.process_group(0);
90    #[cfg(windows)]
91    process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
92
93    // In order to make bugs with improper use of filesystem without getting the engine current
94    // directory more obvious, the plugin always starts in the directory of its executable
95    if let Some(dirname) = path.parent() {
96        process.current_dir(dirname);
97    }
98
99    process
100}
101
102/// Create a plugin interface from a spawned child process.
103///
104/// `comm` determines the communication type the process was spawned with, and whether stdio will
105/// be taken from the child.
106pub fn make_plugin_interface(
107    mut child: Child,
108    comm: PreparedServerCommunication,
109    source: Arc<PluginSource>,
110    pid: Option<u32>,
111    gc: Option<PluginGc>,
112) -> Result<PluginInterface, ShellError> {
113    match comm.connect(&mut child)? {
114        ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
115            stdout,
116            stdin,
117            move || {
118                let _ = child.wait();
119            },
120            source,
121            pid,
122            gc,
123        ),
124        #[cfg(feature = "local-socket")]
125        ServerCommunicationIo::LocalSocket { read_out, write_in } => {
126            make_plugin_interface_with_streams(
127                read_out,
128                write_in,
129                move || {
130                    let _ = child.wait();
131                },
132                source,
133                pid,
134                gc,
135            )
136        }
137    }
138}
139
140/// Create a plugin interface from low-level components.
141///
142/// - `after_close` is called to clean up after the `reader` ends.
143/// - `source` is required so that custom values produced by the plugin can spawn it.
144/// - `pid` may be provided for process management (e.g. `EnterForeground`).
145/// - `gc` may be provided for communication with the plugin's GC (e.g. `SetGcDisabled`).
146pub fn make_plugin_interface_with_streams(
147    mut reader: impl std::io::Read + Send + 'static,
148    writer: impl std::io::Write + Send + 'static,
149    after_close: impl FnOnce() + Send + 'static,
150    source: Arc<PluginSource>,
151    pid: Option<u32>,
152    gc: Option<PluginGc>,
153) -> Result<PluginInterface, ShellError> {
154    let encoder = get_plugin_encoding(&mut reader)?;
155
156    let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
157    let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);
158
159    let mut manager =
160        PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
161    manager.set_garbage_collector(gc);
162
163    let interface = manager.get_interface();
164    interface.hello()?;
165
166    // Spawn the reader on a new thread. We need to be able to read messages at the same time that
167    // we write, because we are expected to be able to handle multiple messages coming in from the
168    // plugin at any time, including stream messages like `Drop`.
169    std::thread::Builder::new()
170        .name(format!(
171            "plugin interface reader ({})",
172            source.identity.name()
173        ))
174        .spawn(move || {
175            if let Err(err) = manager.consume_all((reader, encoder)) {
176                log::warn!("Error in PluginInterfaceManager: {err}");
177            }
178            // If the loop has ended, drop the manager so everyone disconnects and then run
179            // after_close
180            drop(manager);
181            after_close();
182        })
183        .map_err(|err| ShellError::PluginFailedToLoad {
184            msg: format!("Failed to spawn thread for plugin: {err}"),
185        })?;
186
187    Ok(interface)
188}
189
190/// Determine the plugin's encoding from a freshly opened stream.
191///
192/// The plugin is expected to send a 1-byte length and either `json` or `msgpack`, so this reads
193/// that and determines the right length.
194pub fn get_plugin_encoding(
195    child_stdout: &mut impl std::io::Read,
196) -> Result<EncodingType, ShellError> {
197    let mut length_buf = [0u8; 1];
198    child_stdout
199        .read_exact(&mut length_buf)
200        .map_err(|e| ShellError::PluginFailedToLoad {
201            msg: format!("unable to get encoding from plugin: {e}"),
202        })?;
203
204    let mut buf = vec![0u8; length_buf[0] as usize];
205    child_stdout
206        .read_exact(&mut buf)
207        .map_err(|e| ShellError::PluginFailedToLoad {
208            msg: format!("unable to get encoding from plugin: {e}"),
209        })?;
210
211    EncodingType::try_from_bytes(&buf).ok_or_else(|| {
212        let encoding_for_debug = String::from_utf8_lossy(&buf);
213        ShellError::PluginFailedToLoad {
214            msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
215        }
216    })
217}
218
219/// Load the definitions from the plugin file into the engine state
220pub fn load_plugin_file(
221    working_set: &mut StateWorkingSet,
222    plugin_registry_file: &PluginRegistryFile,
223    span: Option<Span>,
224) {
225    for plugin in &plugin_registry_file.plugins {
226        // Any errors encountered should just be logged.
227        if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
228            report_shell_error(working_set.permanent_state, &err)
229        }
230    }
231}
232
233/// Load a definition from the plugin file into the engine state
234pub fn load_plugin_registry_item(
235    working_set: &mut StateWorkingSet,
236    plugin: &PluginRegistryItem,
237    span: Option<Span>,
238) -> Result<Arc<PersistentPlugin>, ShellError> {
239    let identity =
240        PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
241            ShellError::GenericError {
242                error: "Invalid plugin filename in plugin registry file".into(),
243                msg: "loaded from here".into(),
244                span,
245                help: Some(format!(
246                    "the filename for `{}` is not a valid nushell plugin: {}",
247                    plugin.name,
248                    plugin.filename.display()
249                )),
250                inner: vec![],
251            }
252        })?;
253
254    match &plugin.data {
255        PluginRegistryItemData::Valid { metadata, commands } => {
256            let plugin = add_plugin_to_working_set(working_set, &identity)?;
257
258            // Ensure that the plugin is reset. We're going to load new signatures, so we want to
259            // make sure the running plugin reflects those new signatures, and it's possible that it
260            // doesn't.
261            plugin.reset()?;
262
263            // Set the plugin metadata from the file
264            plugin.set_metadata(Some(metadata.clone()));
265
266            // Create the declarations from the commands
267            for signature in commands {
268                let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
269                working_set.add_decl(Box::new(decl));
270            }
271            Ok(plugin)
272        }
273        PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
274            plugin_name: identity.name().to_owned(),
275            span,
276            add_command: identity.add_command(),
277        }),
278    }
279}
280
281/// Find [`PersistentPlugin`] with the given `identity` in the `working_set`, or construct it
282/// if it doesn't exist.
283///
284/// The garbage collection config is always found and set in either case.
285pub fn add_plugin_to_working_set(
286    working_set: &mut StateWorkingSet,
287    identity: &PluginIdentity,
288) -> Result<Arc<PersistentPlugin>, ShellError> {
289    // Find garbage collection config for the plugin
290    let gc_config = working_set
291        .get_config()
292        .plugin_gc
293        .get(identity.name())
294        .clone();
295
296    // Add it to / get it from the working set
297    let plugin = working_set.find_or_create_plugin(identity, || {
298        Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
299    });
300
301    plugin.set_gc_config(&gc_config);
302
303    // Downcast the plugin to `PersistentPlugin` - we generally expect this to succeed.
304    // The trait object only exists so that nu-protocol can contain plugins without knowing
305    // anything about their implementation, but we only use `PersistentPlugin` in practice.
306    plugin
307        .as_any()
308        .downcast()
309        .map_err(|_| ShellError::NushellFailed {
310            msg: "encountered unexpected RegisteredPlugin type".into(),
311        })
312}