nu_plugin_engine/
gc.rs

1use crate::PersistentPlugin;
2use nu_protocol::{PluginGcConfig, RegisteredPlugin};
3use std::{
4    sync::{Arc, Weak, mpsc},
5    thread,
6    time::{Duration, Instant},
7};
8
9/// Plugin garbage collector
10///
11/// Many users don't want all of their plugins to stay running indefinitely after using them, so
12/// this runs a thread that monitors the plugin's usage and stops it automatically if it meets
13/// certain conditions of inactivity.
14#[derive(Debug, Clone)]
15pub struct PluginGc {
16    sender: mpsc::Sender<PluginGcMsg>,
17}
18
19impl PluginGc {
20    /// Start a new plugin garbage collector. Returns an error if the thread failed to spawn.
21    pub fn new(
22        config: PluginGcConfig,
23        plugin: &Arc<PersistentPlugin>,
24    ) -> std::io::Result<PluginGc> {
25        let (sender, receiver) = mpsc::channel();
26
27        let mut state = PluginGcState {
28            config,
29            last_update: None,
30            locks: 0,
31            disabled: false,
32            plugin: Arc::downgrade(plugin),
33            name: plugin.identity().name().to_owned(),
34        };
35
36        thread::Builder::new()
37            .name(format!("plugin gc ({})", plugin.identity().name()))
38            .spawn(move || state.run(receiver))?;
39
40        Ok(PluginGc { sender })
41    }
42
43    /// Update the garbage collector config
44    pub fn set_config(&self, config: PluginGcConfig) {
45        let _ = self.sender.send(PluginGcMsg::SetConfig(config));
46    }
47
48    /// Ensure all GC messages have been processed
49    pub fn flush(&self) {
50        let (tx, rx) = mpsc::channel();
51        let _ = self.sender.send(PluginGcMsg::Flush(tx));
52        // This will block until the channel is dropped, which could be because the send failed, or
53        // because the GC got the message
54        let _ = rx.recv();
55    }
56
57    /// Increment the number of locks held by the plugin
58    pub fn increment_locks(&self, amount: i64) {
59        let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
60    }
61
62    /// Decrement the number of locks held by the plugin
63    pub fn decrement_locks(&self, amount: i64) {
64        let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
65    }
66
67    /// Set whether the GC is disabled by explicit request from the plugin. This is separate from
68    /// the `enabled` option in the config, and overrides that option.
69    pub fn set_disabled(&self, disabled: bool) {
70        let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
71    }
72
73    /// Tell the GC to stop tracking the plugin. The plugin will not be stopped. The GC cannot be
74    /// reactivated after this request - a new one must be created instead.
75    pub fn stop_tracking(&self) {
76        let _ = self.sender.send(PluginGcMsg::StopTracking);
77    }
78
79    /// Tell the GC that the plugin exited so that it can remove it from the persistent plugin.
80    ///
81    /// The reason the plugin tells the GC rather than just stopping itself via `source` is that
82    /// it can't guarantee that the plugin currently pointed to by `source` is itself, but if the
83    /// GC is still running, it hasn't received [`.stop_tracking()`](Self::stop_tracking) yet, which
84    /// means it should be the right plugin.
85    pub fn exited(&self) {
86        let _ = self.sender.send(PluginGcMsg::Exited);
87    }
88}
89
90#[derive(Debug)]
91enum PluginGcMsg {
92    SetConfig(PluginGcConfig),
93    Flush(mpsc::Sender<()>),
94    AddLocks(i64),
95    SetDisabled(bool),
96    StopTracking,
97    Exited,
98}
99
100#[derive(Debug)]
101struct PluginGcState {
102    config: PluginGcConfig,
103    last_update: Option<Instant>,
104    locks: i64,
105    disabled: bool,
106    plugin: Weak<PersistentPlugin>,
107    name: String,
108}
109
110impl PluginGcState {
111    fn next_timeout(&self, now: Instant) -> Option<Duration> {
112        if self.locks <= 0 && !self.disabled {
113            self.last_update
114                .zip(self.config.enabled.then_some(self.config.stop_after))
115                .map(|(last_update, stop_after)| {
116                    // If configured to stop, and used at some point, calculate the difference
117                    let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
118                    let duration_since_last_update = now.duration_since(last_update);
119                    stop_after_duration.saturating_sub(duration_since_last_update)
120                })
121        } else {
122            // Don't timeout if there are locks set, or disabled
123            None
124        }
125    }
126
127    // returns `Some()` if the GC should not continue to operate, with `true` if it should stop the
128    // plugin, or `false` if it should not
129    fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
130        match msg {
131            PluginGcMsg::SetConfig(config) => {
132                self.config = config;
133            }
134            PluginGcMsg::Flush(sender) => {
135                // Rather than sending a message, we just drop the channel, which causes the other
136                // side to disconnect equally well
137                drop(sender);
138            }
139            PluginGcMsg::AddLocks(amount) => {
140                self.locks += amount;
141                if self.locks < 0 {
142                    log::warn!(
143                        "Plugin GC ({name}) problem: locks count below zero after adding \
144                            {amount}: locks={locks}",
145                        name = self.name,
146                        locks = self.locks,
147                    );
148                }
149                // Any time locks are modified, that counts as activity
150                self.last_update = Some(Instant::now());
151            }
152            PluginGcMsg::SetDisabled(disabled) => {
153                self.disabled = disabled;
154            }
155            PluginGcMsg::StopTracking => {
156                // Immediately exit without stopping the plugin
157                return Some(false);
158            }
159            PluginGcMsg::Exited => {
160                // Exit and stop the plugin
161                return Some(true);
162            }
163        }
164        None
165    }
166
167    fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
168        let mut always_stop = false;
169
170        loop {
171            let Some(msg) = (match self.next_timeout(Instant::now()) {
172                Some(duration) => receiver.recv_timeout(duration).ok(),
173                None => receiver.recv().ok(),
174            }) else {
175                // If the timeout was reached, or the channel is disconnected, break the loop
176                break;
177            };
178
179            log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
180
181            if let Some(should_stop) = self.handle_message(msg) {
182                // Exit the GC
183                if should_stop {
184                    // If should_stop = true, attempt to stop the plugin
185                    always_stop = true;
186                    break;
187                } else {
188                    // Don't stop the plugin
189                    return;
190                }
191            }
192        }
193
194        // Upon exiting the loop, if the timeout reached zero, or we are exiting due to an Exited
195        // message, stop the plugin
196        if always_stop
197            || self
198                .next_timeout(Instant::now())
199                .is_some_and(|t| t.is_zero())
200        {
201            // We only hold a weak reference, and it's not an error if we fail to upgrade it -
202            // that just means the plugin is definitely stopped anyway.
203            if let Some(plugin) = self.plugin.upgrade() {
204                let name = &self.name;
205                if let Err(err) = plugin.stop() {
206                    log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
207                } else {
208                    log::debug!("Plugin `{name}` successfully stopped by GC");
209                }
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    fn test_state() -> PluginGcState {
220        PluginGcState {
221            config: PluginGcConfig::default(),
222            last_update: None,
223            locks: 0,
224            disabled: false,
225            plugin: Weak::new(),
226            name: "test".into(),
227        }
228    }
229
230    #[test]
231    fn timeout_configured_as_zero() {
232        let now = Instant::now();
233        let mut state = test_state();
234        state.config.enabled = true;
235        state.config.stop_after = 0;
236        state.last_update = Some(now);
237
238        assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
239    }
240
241    #[test]
242    fn timeout_past_deadline() {
243        let now = Instant::now();
244        let mut state = test_state();
245        state.config.enabled = true;
246        state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
247        state.last_update = Some(now.checked_sub(Duration::from_secs(2)).unwrap());
248
249        assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
250    }
251
252    #[test]
253    fn timeout_with_deadline_in_future() {
254        let now = Instant::now();
255        let mut state = test_state();
256        state.config.enabled = true;
257        state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
258        state.last_update = Some(now);
259
260        assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
261    }
262
263    #[test]
264    fn no_timeout_if_disabled_by_config() {
265        let now = Instant::now();
266        let mut state = test_state();
267        state.config.enabled = false;
268        state.last_update = Some(now);
269
270        assert_eq!(None, state.next_timeout(now));
271    }
272
273    #[test]
274    fn no_timeout_if_disabled_by_plugin() {
275        let now = Instant::now();
276        let mut state = test_state();
277        state.config.enabled = true;
278        state.disabled = true;
279        state.last_update = Some(now);
280
281        assert_eq!(None, state.next_timeout(now));
282    }
283
284    #[test]
285    fn no_timeout_if_locks_count_over_zero() {
286        let now = Instant::now();
287        let mut state = test_state();
288        state.config.enabled = true;
289        state.locks = 1;
290        state.last_update = Some(now);
291
292        assert_eq!(None, state.next_timeout(now));
293    }
294
295    #[test]
296    fn adding_locks_changes_last_update() {
297        let mut state = test_state();
298        let original_last_update =
299            Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
300        state.last_update = original_last_update;
301        state.handle_message(PluginGcMsg::AddLocks(1));
302        assert_ne!(original_last_update, state.last_update, "not updated");
303    }
304}