nu_plugin_engine/
persistent.rs

1use crate::{
2    PluginGc,
3    init::{create_command, make_plugin_interface},
4};
5
6use super::{PluginInterface, PluginSource};
7use nu_plugin_core::CommunicationMode;
8use nu_protocol::{
9    HandlerGuard, Handlers, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin,
10    ShellError,
11    engine::{EngineState, Stack},
12    shell_error::io::IoError,
13};
14use std::{
15    collections::HashMap,
16    sync::{Arc, Mutex},
17};
18
19/// A box that can keep a plugin that was spawned persistent for further uses. The plugin may or
20/// may not be currently running. [`.get()`] gets the currently running plugin, or spawns it if it's
21/// not running.
22#[derive(Debug)]
23pub struct PersistentPlugin {
24    /// Identity (filename, shell, name) of the plugin
25    identity: PluginIdentity,
26    /// Mutable state
27    mutable: Mutex<MutableState>,
28}
29
30/// The mutable state for the persistent plugin. This should all be behind one lock to prevent lock
31/// order problems.
32#[derive(Debug)]
33struct MutableState {
34    /// Reference to the plugin if running
35    running: Option<RunningPlugin>,
36    /// Metadata for the plugin, e.g. version.
37    metadata: Option<PluginMetadata>,
38    /// Plugin's preferred communication mode (if known)
39    preferred_mode: Option<PreferredCommunicationMode>,
40    /// Garbage collector config
41    gc_config: PluginGcConfig,
42    /// RAII guard for this plugin's signal handler
43    signal_guard: Option<HandlerGuard>,
44}
45
46#[derive(Debug, Clone, Copy)]
47enum PreferredCommunicationMode {
48    Stdio,
49    #[cfg(feature = "local-socket")]
50    LocalSocket,
51}
52
53#[derive(Debug)]
54struct RunningPlugin {
55    /// Interface (which can be cloned) to the running plugin
56    interface: PluginInterface,
57    /// Garbage collector for the plugin
58    gc: PluginGc,
59}
60
61impl PersistentPlugin {
62    /// Create a new persistent plugin. The plugin will not be spawned immediately.
63    pub fn new(identity: PluginIdentity, gc_config: PluginGcConfig) -> PersistentPlugin {
64        PersistentPlugin {
65            identity,
66            mutable: Mutex::new(MutableState {
67                running: None,
68                metadata: None,
69                preferred_mode: None,
70                gc_config,
71                signal_guard: None,
72            }),
73        }
74    }
75
76    /// Get the plugin interface of the running plugin, or spawn it if it's not currently running.
77    ///
78    /// Will call `envs` to get environment variables to spawn the plugin if the plugin needs to be
79    /// spawned.
80    pub fn get(
81        self: Arc<Self>,
82        envs: impl FnOnce() -> Result<HashMap<String, String>, ShellError>,
83    ) -> Result<PluginInterface, ShellError> {
84        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
85            msg: format!(
86                "plugin `{}` mutex poisoned, probably panic during spawn",
87                self.identity.name()
88            ),
89        })?;
90
91        if let Some(ref running) = mutable.running {
92            // It exists, so just clone the interface
93            Ok(running.interface.clone())
94        } else {
95            // Try to spawn. On success, `mutable.running` should have been set to the new running
96            // plugin by `spawn()` so we just then need to clone the interface from there.
97            //
98            // We hold the lock the whole time to prevent others from trying to spawn and ending
99            // up with duplicate plugins
100            //
101            // TODO: We should probably store the envs somewhere, in case we have to launch without
102            // envs (e.g. from a custom value)
103            let envs = envs()?;
104            let result = self.clone().spawn(&envs, &mut mutable);
105
106            // Check if we were using an alternate communication mode and may need to fall back to
107            // stdio.
108            if result.is_err()
109                && !matches!(
110                    mutable.preferred_mode,
111                    Some(PreferredCommunicationMode::Stdio)
112                )
113            {
114                log::warn!(
115                    "{}: Trying again with stdio communication because mode {:?} failed with {result:?}",
116                    self.identity.name(),
117                    mutable.preferred_mode
118                );
119                // Reset to stdio and try again, but this time don't catch any error
120                mutable.preferred_mode = Some(PreferredCommunicationMode::Stdio);
121                self.clone().spawn(&envs, &mut mutable)?;
122            }
123
124            Ok(mutable
125                .running
126                .as_ref()
127                .ok_or_else(|| ShellError::NushellFailed {
128                    msg: "spawn() succeeded but didn't set interface".into(),
129                })?
130                .interface
131                .clone())
132        }
133    }
134
135    /// Run the plugin command, then set up and set `mutable.running` to the new running plugin.
136    fn spawn(
137        self: Arc<Self>,
138        envs: &HashMap<String, String>,
139        mutable: &mut MutableState,
140    ) -> Result<(), ShellError> {
141        // Make sure `running` is set to None to begin
142        if let Some(running) = mutable.running.take() {
143            // Stop the GC if there was a running plugin
144            running.gc.stop_tracking();
145        }
146
147        let source_file = self.identity.filename();
148
149        // Determine the mode to use based on the preferred mode
150        let mode = match mutable.preferred_mode {
151            // If not set, we try stdio first and then might retry if another mode is supported
152            Some(PreferredCommunicationMode::Stdio) | None => CommunicationMode::Stdio,
153            // Local socket only if enabled
154            #[cfg(feature = "local-socket")]
155            Some(PreferredCommunicationMode::LocalSocket) => {
156                CommunicationMode::local_socket(source_file)
157            }
158        };
159
160        let mut plugin_cmd = create_command(source_file, self.identity.shell(), &mode);
161
162        // We need the current environment variables for `python` based plugins
163        // Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
164        plugin_cmd.envs(envs);
165
166        let program_name = plugin_cmd.get_program().to_os_string().into_string();
167
168        // Before running the command, prepare communication
169        let comm = mode.serve()?;
170
171        // Run the plugin command
172        let child = plugin_cmd.spawn().map_err(|err| {
173            let error_msg = match err.kind() {
174                std::io::ErrorKind::NotFound => match program_name {
175                    Ok(prog_name) => {
176                        format!(
177                            "Can't find {prog_name}, please make sure that {prog_name} is in PATH."
178                        )
179                    }
180                    _ => {
181                        format!("Error spawning child process: {err}")
182                    }
183                },
184                _ => {
185                    format!("Error spawning child process: {err}")
186                }
187            };
188            ShellError::PluginFailedToLoad { msg: error_msg }
189        })?;
190
191        // Start the plugin garbage collector
192        let gc = PluginGc::new(mutable.gc_config.clone(), &self).map_err(|err| {
193            IoError::new_internal(err, "Could not start plugin gc", nu_protocol::location!())
194        })?;
195
196        let pid = child.id();
197        let interface = make_plugin_interface(
198            child,
199            comm,
200            Arc::new(PluginSource::new(self.clone())),
201            Some(pid),
202            Some(gc.clone()),
203        )?;
204
205        // If our current preferred mode is None, check to see if the plugin might support another
206        // mode. If so, retry spawn() with that mode
207        #[cfg(feature = "local-socket")]
208        if mutable.preferred_mode.is_none()
209            && interface
210                .protocol_info()?
211                .supports_feature(&nu_plugin_protocol::Feature::LocalSocket)
212        {
213            log::trace!(
214                "{}: Attempting to upgrade to local socket mode",
215                self.identity.name()
216            );
217            // Stop the GC we just created from tracking so that we don't accidentally try to
218            // stop the new plugin
219            gc.stop_tracking();
220            // Set the mode and try again
221            mutable.preferred_mode = Some(PreferredCommunicationMode::LocalSocket);
222            return self.spawn(envs, mutable);
223        }
224
225        mutable.running = Some(RunningPlugin { interface, gc });
226        Ok(())
227    }
228
229    fn stop_internal(&self, reset: bool) -> Result<(), ShellError> {
230        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
231            msg: format!(
232                "plugin `{}` mutable mutex poisoned, probably panic during spawn",
233                self.identity.name()
234            ),
235        })?;
236
237        // If the plugin is running, stop its GC, so that the GC doesn't accidentally try to stop
238        // a future plugin
239        if let Some(ref running) = mutable.running {
240            running.gc.stop_tracking();
241        }
242
243        // We don't try to kill the process or anything, we just drop the RunningPlugin. It should
244        // exit soon after
245        mutable.running = None;
246
247        // If this is a reset, we should also reset other learned attributes like preferred_mode
248        if reset {
249            mutable.preferred_mode = None;
250        }
251        Ok(())
252    }
253}
254
255impl RegisteredPlugin for PersistentPlugin {
256    fn identity(&self) -> &PluginIdentity {
257        &self.identity
258    }
259
260    fn is_running(&self) -> bool {
261        // If the lock is poisoned, we return false here. That may not be correct, but this is a
262        // failure state anyway that would be noticed at some point
263        self.mutable
264            .lock()
265            .map(|m| m.running.is_some())
266            .unwrap_or(false)
267    }
268
269    fn pid(&self) -> Option<u32> {
270        // Again, we return None for a poisoned lock.
271        self.mutable
272            .lock()
273            .ok()
274            .and_then(|r| r.running.as_ref().and_then(|r| r.interface.pid()))
275    }
276
277    fn stop(&self) -> Result<(), ShellError> {
278        self.stop_internal(false)
279    }
280
281    fn reset(&self) -> Result<(), ShellError> {
282        self.stop_internal(true)
283    }
284
285    fn metadata(&self) -> Option<PluginMetadata> {
286        self.mutable.lock().ok().and_then(|m| m.metadata.clone())
287    }
288
289    fn set_metadata(&self, metadata: Option<PluginMetadata>) {
290        if let Ok(mut mutable) = self.mutable.lock() {
291            mutable.metadata = metadata;
292        }
293    }
294
295    fn set_gc_config(&self, gc_config: &PluginGcConfig) {
296        if let Ok(mut mutable) = self.mutable.lock() {
297            // Save the new config for future calls
298            mutable.gc_config = gc_config.clone();
299
300            // If the plugin is already running, propagate the config change to the running GC
301            if let Some(gc) = mutable.running.as_ref().map(|running| running.gc.clone()) {
302                // We don't want to get caught holding the lock
303                drop(mutable);
304                gc.set_config(gc_config.clone());
305                gc.flush();
306            }
307        }
308    }
309
310    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync> {
311        self
312    }
313
314    fn configure_signal_handler(self: Arc<Self>, handlers: &Handlers) -> Result<(), ShellError> {
315        let guard = {
316            // We take a weakref to the plugin so that we don't create a cycle to the
317            // RAII guard that will be stored on the plugin.
318            let plugin = Arc::downgrade(&self);
319            handlers.register(Box::new(move |action| {
320                // write a signal packet through the PluginInterface if the plugin is alive and
321                // running
322                if let Some(plugin) = plugin.upgrade() {
323                    if let Ok(mutable) = plugin.mutable.lock() {
324                        if let Some(ref running) = mutable.running {
325                            let _ = running.interface.signal(action);
326                        }
327                    }
328                }
329            }))?
330        };
331
332        if let Ok(mut mutable) = self.mutable.lock() {
333            mutable.signal_guard = Some(guard);
334        }
335
336        Ok(())
337    }
338}
339
340/// Anything that can produce a plugin interface.
341pub trait GetPlugin: RegisteredPlugin {
342    /// Retrieve or spawn a [`PluginInterface`]. The `context` may be used for determining
343    /// environment variables to launch the plugin with.
344    fn get_plugin(
345        self: Arc<Self>,
346        context: Option<(&EngineState, &mut Stack)>,
347    ) -> Result<PluginInterface, ShellError>;
348}
349
350impl GetPlugin for PersistentPlugin {
351    fn get_plugin(
352        self: Arc<Self>,
353        mut context: Option<(&EngineState, &mut Stack)>,
354    ) -> Result<PluginInterface, ShellError> {
355        self.get(|| {
356            // Get envs from the context if provided.
357            let envs = context
358                .as_mut()
359                .map(|(engine_state, stack)| {
360                    // We need the current environment variables for `python` based plugins. Or
361                    // we'll likely have a problem when a plugin is implemented in a virtual Python
362                    // environment.
363                    let stack = &mut stack.start_collect_value();
364                    nu_engine::env::env_to_strings(engine_state, stack)
365                })
366                .transpose()?;
367
368            Ok(envs.unwrap_or_default())
369        })
370    }
371}