1use std::{
2 io::{BufReader, BufWriter},
3 path::Path,
4 process::Child,
5 sync::{Arc, Mutex},
6};
7
8#[cfg(unix)]
9use std::os::unix::process::CommandExt;
10#[cfg(windows)]
11use std::os::windows::process::CommandExt;
12
13use nu_plugin_core::{
14 CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
15 ServerCommunicationIo,
16};
17use nu_protocol::{
18 PluginIdentity, PluginRegistryFile, PluginRegistryItem, PluginRegistryItemData,
19 RegisteredPlugin, ShellError, Span, engine::StateWorkingSet, report_shell_error,
20 shell_error::generic::GenericError,
21};
22
23use crate::{
24 PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
25 PluginSource,
26};
27
28pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
32
33pub fn create_command(
36 path: &Path,
37 mut shell: Option<&Path>,
38 mode: &CommunicationMode,
39) -> std::process::Command {
40 log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");
41
42 let mut shell_args = vec![];
43
44 if shell.is_none() {
45 shell = match path.extension().and_then(|e| e.to_str()) {
50 Some("sh") => {
51 if cfg!(unix) {
52 None
55 } else {
56 Some(Path::new("sh"))
57 }
58 }
59 Some("nu") => {
60 shell_args.push("--stdin");
61 Some(Path::new("nu"))
62 }
63 Some("py") => Some(Path::new("python")),
64 Some("rb") => Some(Path::new("ruby")),
65 Some("jar") => {
66 shell_args.push("-jar");
67 Some(Path::new("java"))
68 }
69 _ => None,
70 };
71 }
72
73 let mut process = if let Some(shell) = shell {
74 let mut process = std::process::Command::new(shell);
75 process.args(shell_args);
76 process.arg(path);
77
78 process
79 } else {
80 std::process::Command::new(path)
81 };
82
83 process.args(mode.args());
84
85 mode.setup_command_io(&mut process);
87
88 #[cfg(unix)]
90 process.process_group(0);
91 #[cfg(windows)]
92 process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
93
94 if let Some(dirname) = path.parent() {
97 process.current_dir(dirname);
98 }
99
100 process
101}
102
103pub fn make_plugin_interface(
108 mut child: Child,
109 comm: PreparedServerCommunication,
110 source: Arc<PluginSource>,
111 pid: Option<u32>,
112 gc: Option<PluginGc>,
113) -> Result<PluginInterface, ShellError> {
114 match comm.connect(&mut child)? {
115 ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
116 stdout,
117 stdin,
118 move || {
119 let _ = child.wait();
120 },
121 source,
122 pid,
123 gc,
124 ),
125 #[cfg(feature = "local-socket")]
126 ServerCommunicationIo::LocalSocket { read_out, write_in } => {
127 make_plugin_interface_with_streams(
128 read_out,
129 write_in,
130 move || {
131 let _ = child.wait();
132 },
133 source,
134 pid,
135 gc,
136 )
137 }
138 }
139}
140
141pub fn make_plugin_interface_with_streams(
148 mut reader: impl std::io::Read + Send + 'static,
149 writer: impl std::io::Write + Send + 'static,
150 after_close: impl FnOnce() + Send + 'static,
151 source: Arc<PluginSource>,
152 pid: Option<u32>,
153 gc: Option<PluginGc>,
154) -> Result<PluginInterface, ShellError> {
155 let encoder = get_plugin_encoding(&mut reader)?;
156
157 let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
158 let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);
159
160 let mut manager =
161 PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
162 manager.set_garbage_collector(gc);
163
164 let interface = manager.get_interface();
165 interface.hello()?;
166
167 std::thread::Builder::new()
171 .name(format!(
172 "plugin interface reader ({})",
173 source.identity.name()
174 ))
175 .spawn(move || {
176 if let Err(err) = manager.consume_all((reader, encoder)) {
177 log::warn!("Error in PluginInterfaceManager: {err}");
178 }
179 drop(manager);
182 after_close();
183 })
184 .map_err(|err| ShellError::PluginFailedToLoad {
185 msg: format!("Failed to spawn thread for plugin: {err}"),
186 })?;
187
188 Ok(interface)
189}
190
191pub fn get_plugin_encoding(
196 child_stdout: &mut impl std::io::Read,
197) -> Result<EncodingType, ShellError> {
198 let mut length_buf = [0u8; 1];
199 child_stdout
200 .read_exact(&mut length_buf)
201 .map_err(|e| ShellError::PluginFailedToLoad {
202 msg: format!("unable to get encoding from plugin: {e}"),
203 })?;
204
205 let mut buf = vec![0u8; length_buf[0] as usize];
206 child_stdout
207 .read_exact(&mut buf)
208 .map_err(|e| ShellError::PluginFailedToLoad {
209 msg: format!("unable to get encoding from plugin: {e}"),
210 })?;
211
212 EncodingType::try_from_bytes(&buf).ok_or_else(|| {
213 let encoding_for_debug = String::from_utf8_lossy(&buf);
214 ShellError::PluginFailedToLoad {
215 msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
216 }
217 })
218}
219
220pub fn load_plugin_file(
222 working_set: &mut StateWorkingSet,
223 plugin_registry_file: &PluginRegistryFile,
224 span: Option<Span>,
225) -> usize {
226 let mut error_count = 0;
227
228 for plugin in &plugin_registry_file.plugins {
229 if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
231 error_count += 1;
232 report_shell_error(None, working_set.permanent_state, &err)
233 }
234 }
235
236 error_count
237}
238
239pub fn load_plugin_registry_item(
241 working_set: &mut StateWorkingSet,
242 plugin: &PluginRegistryItem,
243 span: Option<Span>,
244) -> Result<Arc<PersistentPlugin>, ShellError> {
245 let identity =
246 PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
247 let help = format!(
248 "the filename for `{}` is not a valid nushell plugin: {}",
249 plugin.name,
250 plugin.filename.display()
251 );
252 let error = if let Some(span) = span {
253 GenericError::new(
254 "Invalid plugin filename in plugin registry file",
255 "loaded from here",
256 span,
257 )
258 .with_help(help)
259 } else {
260 GenericError::new_internal(
261 "Invalid plugin filename in plugin registry file",
262 "loaded from here",
263 )
264 .with_help(help)
265 };
266 ShellError::Generic(error)
267 })?;
268
269 match &plugin.data {
270 PluginRegistryItemData::Valid { metadata, commands } => {
271 let plugin = add_plugin_to_working_set(working_set, &identity)?;
272
273 plugin.reset()?;
277
278 plugin.set_metadata(Some(metadata.clone()));
280
281 for signature in commands {
283 let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
284 working_set.add_decl(Box::new(decl));
285 }
286 Ok(plugin)
287 }
288 PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
289 plugin_name: identity.name().to_owned(),
290 span,
291 add_command: identity.add_command(),
292 }),
293 }
294}
295
296pub fn add_plugin_to_working_set(
301 working_set: &mut StateWorkingSet,
302 identity: &PluginIdentity,
303) -> Result<Arc<PersistentPlugin>, ShellError> {
304 let gc_config = working_set
306 .get_config()
307 .plugin_gc
308 .get(identity.name())
309 .clone();
310
311 let plugin = working_set.find_or_create_plugin(identity, || {
313 Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
314 });
315
316 plugin.set_gc_config(&gc_config);
317
318 plugin
322 .as_any()
323 .downcast()
324 .map_err(|_| ShellError::NushellFailed {
325 msg: "encountered unexpected RegisteredPlugin type".into(),
326 })
327}