Skip to main content

nu_plugin_engine/
persistent.rs

1use crate::{
2    PluginGc,
3    init::{create_command, make_plugin_interface},
4};
5
6use super::{PluginInterface, PluginSource};
7use nu_plugin_core::CommunicationMode;
8use nu_protocol::{
9    HandlerGuard, Handlers, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin,
10    ShellError,
11    engine::{EngineState, Stack},
12    shell_error::io::IoError,
13};
14use std::{
15    collections::HashMap,
16    sync::{Arc, Mutex},
17};
18
19/// A box that can keep a plugin that was spawned persistent for further uses. The plugin may or
20/// may not be currently running. [`.get()`](Self::get) gets the currently running plugin, or spawns it if it's
21/// not running.
22#[derive(Debug)]
23pub struct PersistentPlugin {
24    /// Identity (filename, shell, name) of the plugin
25    identity: PluginIdentity,
26    /// Mutable state
27    mutable: Mutex<MutableState>,
28}
29
30/// The mutable state for the persistent plugin. This should all be behind one lock to prevent lock
31/// order problems.
32#[derive(Debug)]
33struct MutableState {
34    /// Reference to the plugin if running
35    running: Option<RunningPlugin>,
36    /// Metadata for the plugin, e.g. version.
37    metadata: Option<PluginMetadata>,
38    /// Plugin's preferred communication mode (if known)
39    preferred_mode: Option<PreferredCommunicationMode>,
40    /// Garbage collector config
41    gc_config: PluginGcConfig,
42    /// RAII guard for this plugin's signal handler
43    signal_guard: Option<HandlerGuard>,
44}
45
46#[derive(Debug, Clone, Copy)]
47enum PreferredCommunicationMode {
48    Stdio,
49    #[cfg(feature = "local-socket")]
50    LocalSocket,
51}
52
53#[derive(Debug)]
54struct RunningPlugin {
55    /// Interface (which can be cloned) to the running plugin
56    interface: PluginInterface,
57    /// Garbage collector for the plugin
58    gc: PluginGc,
59}
60
61impl PersistentPlugin {
62    /// Create a new persistent plugin. The plugin will not be spawned immediately.
63    pub fn new(identity: PluginIdentity, gc_config: PluginGcConfig) -> PersistentPlugin {
64        PersistentPlugin {
65            identity,
66            mutable: Mutex::new(MutableState {
67                running: None,
68                metadata: None,
69                preferred_mode: None,
70                gc_config,
71                signal_guard: None,
72            }),
73        }
74    }
75
76    /// Get the plugin interface of the running plugin, or spawn it if it's not currently running.
77    ///
78    /// Will call `envs` to get environment variables to spawn the plugin if the plugin needs to be
79    /// spawned.
80    pub fn get(
81        self: Arc<Self>,
82        envs: impl FnOnce() -> Result<HashMap<String, String>, ShellError>,
83    ) -> Result<PluginInterface, ShellError> {
84        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
85            msg: format!(
86                "plugin `{}` mutex poisoned, probably panic during spawn",
87                self.identity.name()
88            ),
89        })?;
90
91        if let Some(ref running) = mutable.running {
92            // It exists, so just clone the interface
93            Ok(running.interface.clone())
94        } else {
95            // Try to spawn. On success, `mutable.running` should have been set to the new running
96            // plugin by `spawn()` so we just then need to clone the interface from there.
97            //
98            // We hold the lock the whole time to prevent others from trying to spawn and ending
99            // up with duplicate plugins
100            //
101            // TODO: We should probably store the envs somewhere, in case we have to launch without
102            // envs (e.g. from a custom value)
103            let envs = envs()?;
104            let result = self.clone().spawn(&envs, &mut mutable);
105
106            // Check if we were using an alternate communication mode and may need to fall back to
107            // stdio.
108            if result.is_err()
109                && !matches!(
110                    mutable.preferred_mode,
111                    Some(PreferredCommunicationMode::Stdio)
112                )
113            {
114                log::warn!(
115                    "{}: Trying again with stdio communication because mode {:?} failed with {result:?}",
116                    self.identity.name(),
117                    mutable.preferred_mode
118                );
119                // Reset to stdio and try again, but this time don't catch any error
120                mutable.preferred_mode = Some(PreferredCommunicationMode::Stdio);
121                self.clone().spawn(&envs, &mut mutable)?;
122            }
123
124            Ok(mutable
125                .running
126                .as_ref()
127                .ok_or_else(|| ShellError::NushellFailed {
128                    msg: "spawn() succeeded but didn't set interface".into(),
129                })?
130                .interface
131                .clone())
132        }
133    }
134
135    /// Run the plugin command, then set up and set `mutable.running` to the new running plugin.
136    fn spawn(
137        self: Arc<Self>,
138        envs: &HashMap<String, String>,
139        mutable: &mut MutableState,
140    ) -> Result<(), ShellError> {
141        // Make sure `running` is set to None to begin
142        if let Some(running) = mutable.running.take() {
143            // Stop the GC if there was a running plugin
144            running.gc.stop_tracking();
145        }
146
147        let source_file = self.identity.filename();
148
149        // Determine the mode to use based on the preferred mode
150        let mode = match mutable.preferred_mode {
151            // If not set, we try stdio first and then might retry if another mode is supported
152            Some(PreferredCommunicationMode::Stdio) | None => CommunicationMode::Stdio,
153            // Local socket only if enabled
154            #[cfg(feature = "local-socket")]
155            Some(PreferredCommunicationMode::LocalSocket) => {
156                CommunicationMode::local_socket(source_file)
157            }
158        };
159
160        let mut plugin_cmd = create_command(source_file, self.identity.shell(), &mode);
161
162        // We need the current environment variables for `python` based plugins
163        // Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
164        plugin_cmd.envs(envs);
165
166        let program_name = plugin_cmd.get_program().to_os_string().into_string();
167
168        // Before running the command, prepare communication
169        let comm = mode.serve()?;
170
171        // Run the plugin command
172        let child = plugin_cmd.spawn().map_err(|err| {
173            let error_msg = match err.kind() {
174                std::io::ErrorKind::NotFound => match program_name {
175                    Ok(prog_name) => {
176                        format!(
177                            "Can't find {prog_name}, please make sure that {prog_name} is in PATH."
178                        )
179                    }
180                    _ => {
181                        format!("Error spawning child process: {err}")
182                    }
183                },
184                _ => {
185                    format!("Error spawning child process: {err}")
186                }
187            };
188            ShellError::PluginFailedToLoad { msg: error_msg }
189        })?;
190
191        // Start the plugin garbage collector
192        let gc = PluginGc::new(mutable.gc_config.clone(), &self)
193            .map_err(|err| IoError::new_internal(err, "Could not start plugin gc"))?;
194
195        let pid = child.id();
196        let interface = make_plugin_interface(
197            child,
198            comm,
199            Arc::new(PluginSource::new(self.clone())),
200            Some(pid),
201            Some(gc.clone()),
202        )?;
203
204        // If our current preferred mode is None, check to see if the plugin might support another
205        // mode. If so, retry spawn() with that mode
206        #[cfg(feature = "local-socket")]
207        if mutable.preferred_mode.is_none()
208            && interface
209                .protocol_info()?
210                .supports_feature(&nu_plugin_protocol::Feature::LocalSocket)
211        {
212            log::trace!(
213                "{}: Attempting to upgrade to local socket mode",
214                self.identity.name()
215            );
216            // Stop the GC we just created from tracking so that we don't accidentally try to
217            // stop the new plugin
218            gc.stop_tracking();
219            // Set the mode and try again
220            mutable.preferred_mode = Some(PreferredCommunicationMode::LocalSocket);
221            return self.spawn(envs, mutable);
222        }
223
224        mutable.running = Some(RunningPlugin { interface, gc });
225        Ok(())
226    }
227
228    fn stop_internal(&self, reset: bool) -> Result<(), ShellError> {
229        let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
230            msg: format!(
231                "plugin `{}` mutable mutex poisoned, probably panic during spawn",
232                self.identity.name()
233            ),
234        })?;
235
236        // If the plugin is running, stop its GC, so that the GC doesn't accidentally try to stop
237        // a future plugin
238        if let Some(ref running) = mutable.running {
239            running.gc.stop_tracking();
240        }
241
242        // We don't try to kill the process or anything, we just drop the RunningPlugin. It should
243        // exit soon after
244        mutable.running = None;
245
246        // If this is a reset, we should also reset other learned attributes like preferred_mode
247        if reset {
248            mutable.preferred_mode = None;
249        }
250        Ok(())
251    }
252}
253
254impl RegisteredPlugin for PersistentPlugin {
255    fn identity(&self) -> &PluginIdentity {
256        &self.identity
257    }
258
259    fn is_running(&self) -> bool {
260        // If the lock is poisoned, we return false here. That may not be correct, but this is a
261        // failure state anyway that would be noticed at some point
262        self.mutable
263            .lock()
264            .map(|m| m.running.is_some())
265            .unwrap_or(false)
266    }
267
268    fn pid(&self) -> Option<u32> {
269        // Again, we return None for a poisoned lock.
270        self.mutable
271            .lock()
272            .ok()
273            .and_then(|r| r.running.as_ref().and_then(|r| r.interface.pid()))
274    }
275
276    fn stop(&self) -> Result<(), ShellError> {
277        self.stop_internal(false)
278    }
279
280    fn reset(&self) -> Result<(), ShellError> {
281        self.stop_internal(true)
282    }
283
284    fn metadata(&self) -> Option<PluginMetadata> {
285        self.mutable.lock().ok().and_then(|m| m.metadata.clone())
286    }
287
288    fn set_metadata(&self, metadata: Option<PluginMetadata>) {
289        if let Ok(mut mutable) = self.mutable.lock() {
290            mutable.metadata = metadata;
291        }
292    }
293
294    fn set_gc_config(&self, gc_config: &PluginGcConfig) {
295        if let Ok(mut mutable) = self.mutable.lock() {
296            // Save the new config for future calls
297            mutable.gc_config = gc_config.clone();
298
299            // If the plugin is already running, propagate the config change to the running GC
300            if let Some(gc) = mutable.running.as_ref().map(|running| running.gc.clone()) {
301                // We don't want to get caught holding the lock
302                drop(mutable);
303                gc.set_config(gc_config.clone());
304                gc.flush();
305            }
306        }
307    }
308
309    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync> {
310        self
311    }
312
313    fn configure_signal_handler(self: Arc<Self>, handlers: &Handlers) -> Result<(), ShellError> {
314        let guard = {
315            // We take a weakref to the plugin so that we don't create a cycle to the
316            // RAII guard that will be stored on the plugin.
317            let plugin = Arc::downgrade(&self);
318            handlers.register(Box::new(move |action| {
319                // write a signal packet through the PluginInterface if the plugin is alive and
320                // running
321                if let Some(plugin) = plugin.upgrade()
322                    && let Ok(mutable) = plugin.mutable.lock()
323                    && let Some(ref running) = mutable.running
324                {
325                    let _ = running.interface.signal(action);
326                }
327            }))?
328        };
329
330        if let Ok(mut mutable) = self.mutable.lock() {
331            mutable.signal_guard = Some(guard);
332        }
333
334        Ok(())
335    }
336}
337
338/// Anything that can produce a plugin interface.
339pub trait GetPlugin: RegisteredPlugin {
340    /// Retrieve or spawn a [`PluginInterface`]. The `context` may be used for determining
341    /// environment variables to launch the plugin with.
342    fn get_plugin(
343        self: Arc<Self>,
344        context: Option<(&EngineState, &mut Stack)>,
345    ) -> Result<PluginInterface, ShellError>;
346}
347
348impl GetPlugin for PersistentPlugin {
349    fn get_plugin(
350        self: Arc<Self>,
351        mut context: Option<(&EngineState, &mut Stack)>,
352    ) -> Result<PluginInterface, ShellError> {
353        self.get(|| {
354            // Get envs from the context if provided.
355            let envs = context
356                .as_mut()
357                .map(|(engine_state, stack)| {
358                    // We need the current environment variables for `python` based plugins. Or
359                    // we'll likely have a problem when a plugin is implemented in a virtual Python
360                    // environment.
361                    let stack = &mut stack.start_collect_value();
362                    nu_engine::env::env_to_strings(engine_state, stack)
363                })
364                .transpose()?;
365
366            Ok(envs.unwrap_or_default())
367        })
368    }
369}