1use std::{
2 io::{BufReader, BufWriter},
3 path::Path,
4 process::Child,
5 sync::{Arc, Mutex},
6};
7
8#[cfg(unix)]
9use std::os::unix::process::CommandExt;
10#[cfg(windows)]
11use std::os::windows::process::CommandExt;
12
13use nu_plugin_core::{
14 CommunicationMode, EncodingType, InterfaceManager, PreparedServerCommunication,
15 ServerCommunicationIo,
16};
17use nu_protocol::{
18 PluginIdentity, PluginRegistryFile, PluginRegistryItem, PluginRegistryItemData,
19 RegisteredPlugin, ShellError, Span, engine::StateWorkingSet, report_shell_error,
20};
21
22use crate::{
23 PersistentPlugin, PluginDeclaration, PluginGc, PluginInterface, PluginInterfaceManager,
24 PluginSource,
25};
26
27pub(crate) const OUTPUT_BUFFER_SIZE: usize = 16384;
31
32pub fn create_command(
35 path: &Path,
36 mut shell: Option<&Path>,
37 mode: &CommunicationMode,
38) -> std::process::Command {
39 log::trace!("Starting plugin: {path:?}, shell = {shell:?}, mode = {mode:?}");
40
41 let mut shell_args = vec![];
42
43 if shell.is_none() {
44 shell = match path.extension().and_then(|e| e.to_str()) {
49 Some("sh") => {
50 if cfg!(unix) {
51 None
54 } else {
55 Some(Path::new("sh"))
56 }
57 }
58 Some("nu") => {
59 shell_args.push("--stdin");
60 Some(Path::new("nu"))
61 }
62 Some("py") => Some(Path::new("python")),
63 Some("rb") => Some(Path::new("ruby")),
64 Some("jar") => {
65 shell_args.push("-jar");
66 Some(Path::new("java"))
67 }
68 _ => None,
69 };
70 }
71
72 let mut process = if let Some(shell) = shell {
73 let mut process = std::process::Command::new(shell);
74 process.args(shell_args);
75 process.arg(path);
76
77 process
78 } else {
79 std::process::Command::new(path)
80 };
81
82 process.args(mode.args());
83
84 mode.setup_command_io(&mut process);
86
87 #[cfg(unix)]
89 process.process_group(0);
90 #[cfg(windows)]
91 process.creation_flags(windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP.0);
92
93 if let Some(dirname) = path.parent() {
96 process.current_dir(dirname);
97 }
98
99 process
100}
101
102pub fn make_plugin_interface(
107 mut child: Child,
108 comm: PreparedServerCommunication,
109 source: Arc<PluginSource>,
110 pid: Option<u32>,
111 gc: Option<PluginGc>,
112) -> Result<PluginInterface, ShellError> {
113 match comm.connect(&mut child)? {
114 ServerCommunicationIo::Stdio(stdin, stdout) => make_plugin_interface_with_streams(
115 stdout,
116 stdin,
117 move || {
118 let _ = child.wait();
119 },
120 source,
121 pid,
122 gc,
123 ),
124 #[cfg(feature = "local-socket")]
125 ServerCommunicationIo::LocalSocket { read_out, write_in } => {
126 make_plugin_interface_with_streams(
127 read_out,
128 write_in,
129 move || {
130 let _ = child.wait();
131 },
132 source,
133 pid,
134 gc,
135 )
136 }
137 }
138}
139
140pub fn make_plugin_interface_with_streams(
147 mut reader: impl std::io::Read + Send + 'static,
148 writer: impl std::io::Write + Send + 'static,
149 after_close: impl FnOnce() + Send + 'static,
150 source: Arc<PluginSource>,
151 pid: Option<u32>,
152 gc: Option<PluginGc>,
153) -> Result<PluginInterface, ShellError> {
154 let encoder = get_plugin_encoding(&mut reader)?;
155
156 let reader = BufReader::with_capacity(OUTPUT_BUFFER_SIZE, reader);
157 let writer = BufWriter::with_capacity(OUTPUT_BUFFER_SIZE, writer);
158
159 let mut manager =
160 PluginInterfaceManager::new(source.clone(), pid, (Mutex::new(writer), encoder));
161 manager.set_garbage_collector(gc);
162
163 let interface = manager.get_interface();
164 interface.hello()?;
165
166 std::thread::Builder::new()
170 .name(format!(
171 "plugin interface reader ({})",
172 source.identity.name()
173 ))
174 .spawn(move || {
175 if let Err(err) = manager.consume_all((reader, encoder)) {
176 log::warn!("Error in PluginInterfaceManager: {err}");
177 }
178 drop(manager);
181 after_close();
182 })
183 .map_err(|err| ShellError::PluginFailedToLoad {
184 msg: format!("Failed to spawn thread for plugin: {err}"),
185 })?;
186
187 Ok(interface)
188}
189
190pub fn get_plugin_encoding(
195 child_stdout: &mut impl std::io::Read,
196) -> Result<EncodingType, ShellError> {
197 let mut length_buf = [0u8; 1];
198 child_stdout
199 .read_exact(&mut length_buf)
200 .map_err(|e| ShellError::PluginFailedToLoad {
201 msg: format!("unable to get encoding from plugin: {e}"),
202 })?;
203
204 let mut buf = vec![0u8; length_buf[0] as usize];
205 child_stdout
206 .read_exact(&mut buf)
207 .map_err(|e| ShellError::PluginFailedToLoad {
208 msg: format!("unable to get encoding from plugin: {e}"),
209 })?;
210
211 EncodingType::try_from_bytes(&buf).ok_or_else(|| {
212 let encoding_for_debug = String::from_utf8_lossy(&buf);
213 ShellError::PluginFailedToLoad {
214 msg: format!("get unsupported plugin encoding: {encoding_for_debug}"),
215 }
216 })
217}
218
219pub fn load_plugin_file(
221 working_set: &mut StateWorkingSet,
222 plugin_registry_file: &PluginRegistryFile,
223 span: Option<Span>,
224) {
225 for plugin in &plugin_registry_file.plugins {
226 if let Err(err) = load_plugin_registry_item(working_set, plugin, span) {
228 report_shell_error(working_set.permanent_state, &err)
229 }
230 }
231}
232
233pub fn load_plugin_registry_item(
235 working_set: &mut StateWorkingSet,
236 plugin: &PluginRegistryItem,
237 span: Option<Span>,
238) -> Result<Arc<PersistentPlugin>, ShellError> {
239 let identity =
240 PluginIdentity::new(plugin.filename.clone(), plugin.shell.clone()).map_err(|_| {
241 ShellError::GenericError {
242 error: "Invalid plugin filename in plugin registry file".into(),
243 msg: "loaded from here".into(),
244 span,
245 help: Some(format!(
246 "the filename for `{}` is not a valid nushell plugin: {}",
247 plugin.name,
248 plugin.filename.display()
249 )),
250 inner: vec![],
251 }
252 })?;
253
254 match &plugin.data {
255 PluginRegistryItemData::Valid { metadata, commands } => {
256 let plugin = add_plugin_to_working_set(working_set, &identity)?;
257
258 plugin.reset()?;
262
263 plugin.set_metadata(Some(metadata.clone()));
265
266 for signature in commands {
268 let decl = PluginDeclaration::new(plugin.clone(), signature.clone());
269 working_set.add_decl(Box::new(decl));
270 }
271 Ok(plugin)
272 }
273 PluginRegistryItemData::Invalid => Err(ShellError::PluginRegistryDataInvalid {
274 plugin_name: identity.name().to_owned(),
275 span,
276 add_command: identity.add_command(),
277 }),
278 }
279}
280
281pub fn add_plugin_to_working_set(
286 working_set: &mut StateWorkingSet,
287 identity: &PluginIdentity,
288) -> Result<Arc<PersistentPlugin>, ShellError> {
289 let gc_config = working_set
291 .get_config()
292 .plugin_gc
293 .get(identity.name())
294 .clone();
295
296 let plugin = working_set.find_or_create_plugin(identity, || {
298 Arc::new(PersistentPlugin::new(identity.clone(), gc_config.clone()))
299 });
300
301 plugin.set_gc_config(&gc_config);
302
303 plugin
307 .as_any()
308 .downcast()
309 .map_err(|_| ShellError::NushellFailed {
310 msg: "encountered unexpected RegisteredPlugin type".into(),
311 })
312}