Skip to main content

nu_plugin_engine/
gc.rs

1use crate::PersistentPlugin;
2use nu_protocol::{PluginGcConfig, RegisteredPlugin};
3use std::{
4    sync::{Arc, Weak, mpsc},
5    thread,
6    time::Duration,
7};
8
9use nu_utils::time::Instant;
10
11/// Plugin garbage collector
12///
13/// Many users don't want all of their plugins to stay running indefinitely after using them, so
14/// this runs a thread that monitors the plugin's usage and stops it automatically if it meets
15/// certain conditions of inactivity.
16#[derive(Debug, Clone)]
17pub struct PluginGc {
18    sender: mpsc::Sender<PluginGcMsg>,
19}
20
21impl PluginGc {
22    /// Start a new plugin garbage collector. Returns an error if the thread failed to spawn.
23    pub fn new(
24        config: PluginGcConfig,
25        plugin: &Arc<PersistentPlugin>,
26    ) -> std::io::Result<PluginGc> {
27        let (sender, receiver) = mpsc::channel();
28
29        let mut state = PluginGcState {
30            config,
31            last_update: None,
32            locks: 0,
33            disabled: false,
34            plugin: Arc::downgrade(plugin),
35            name: plugin.identity().name().to_owned(),
36        };
37
38        thread::Builder::new()
39            .name(format!("plugin gc ({})", plugin.identity().name()))
40            .spawn(move || state.run(receiver))?;
41
42        Ok(PluginGc { sender })
43    }
44
45    /// Update the garbage collector config
46    pub fn set_config(&self, config: PluginGcConfig) {
47        let _ = self.sender.send(PluginGcMsg::SetConfig(config));
48    }
49
50    /// Ensure all GC messages have been processed
51    pub fn flush(&self) {
52        let (tx, rx) = mpsc::channel();
53        let _ = self.sender.send(PluginGcMsg::Flush(tx));
54        // This will block until the channel is dropped, which could be because the send failed, or
55        // because the GC got the message
56        let _ = rx.recv();
57    }
58
59    /// Increment the number of locks held by the plugin
60    pub fn increment_locks(&self, amount: i64) {
61        let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
62    }
63
64    /// Decrement the number of locks held by the plugin
65    pub fn decrement_locks(&self, amount: i64) {
66        let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
67    }
68
69    /// Set whether the GC is disabled by explicit request from the plugin. This is separate from
70    /// the `enabled` option in the config, and overrides that option.
71    pub fn set_disabled(&self, disabled: bool) {
72        let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
73    }
74
75    /// Tell the GC to stop tracking the plugin. The plugin will not be stopped. The GC cannot be
76    /// reactivated after this request - a new one must be created instead.
77    pub fn stop_tracking(&self) {
78        let _ = self.sender.send(PluginGcMsg::StopTracking);
79    }
80
81    /// Tell the GC that the plugin exited so that it can remove it from the persistent plugin.
82    ///
83    /// The reason the plugin tells the GC rather than just stopping itself via `source` is that
84    /// it can't guarantee that the plugin currently pointed to by `source` is itself, but if the
85    /// GC is still running, it hasn't received [`.stop_tracking()`](Self::stop_tracking) yet, which
86    /// means it should be the right plugin.
87    pub fn exited(&self) {
88        let _ = self.sender.send(PluginGcMsg::Exited);
89    }
90}
91
92#[derive(Debug)]
93enum PluginGcMsg {
94    SetConfig(PluginGcConfig),
95    Flush(mpsc::Sender<()>),
96    AddLocks(i64),
97    SetDisabled(bool),
98    StopTracking,
99    Exited,
100}
101
102#[derive(Debug)]
103struct PluginGcState {
104    config: PluginGcConfig,
105    last_update: Option<Instant>,
106    locks: i64,
107    disabled: bool,
108    plugin: Weak<PersistentPlugin>,
109    name: String,
110}
111
112impl PluginGcState {
113    fn next_timeout(&self, now: Instant) -> Option<Duration> {
114        if self.locks <= 0 && !self.disabled {
115            self.last_update
116                .zip(self.config.enabled.then_some(self.config.stop_after))
117                .map(|(last_update, stop_after)| {
118                    // If configured to stop, and used at some point, calculate the difference
119                    let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
120                    let duration_since_last_update = now.duration_since(last_update);
121                    stop_after_duration.saturating_sub(duration_since_last_update)
122                })
123        } else {
124            // Don't timeout if there are locks set, or disabled
125            None
126        }
127    }
128
129    // returns `Some()` if the GC should not continue to operate, with `true` if it should stop the
130    // plugin, or `false` if it should not
131    fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
132        match msg {
133            PluginGcMsg::SetConfig(config) => {
134                self.config = config;
135            }
136            PluginGcMsg::Flush(sender) => {
137                // Rather than sending a message, we just drop the channel, which causes the other
138                // side to disconnect equally well
139                drop(sender);
140            }
141            PluginGcMsg::AddLocks(amount) => {
142                self.locks += amount;
143                if self.locks < 0 {
144                    log::warn!(
145                        "Plugin GC ({name}) problem: locks count below zero after adding \
146                            {amount}: locks={locks}",
147                        name = self.name,
148                        locks = self.locks,
149                    );
150                }
151                // Any time locks are modified, that counts as activity
152                self.last_update = Some(Instant::now());
153            }
154            PluginGcMsg::SetDisabled(disabled) => {
155                self.disabled = disabled;
156            }
157            PluginGcMsg::StopTracking => {
158                // Immediately exit without stopping the plugin
159                return Some(false);
160            }
161            PluginGcMsg::Exited => {
162                // Exit and stop the plugin
163                return Some(true);
164            }
165        }
166        None
167    }
168
169    fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
170        let mut always_stop = false;
171
172        loop {
173            let Some(msg) = (match self.next_timeout(Instant::now()) {
174                Some(duration) => receiver.recv_timeout(duration).ok(),
175                None => receiver.recv().ok(),
176            }) else {
177                // If the timeout was reached, or the channel is disconnected, break the loop
178                break;
179            };
180
181            log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
182
183            if let Some(should_stop) = self.handle_message(msg) {
184                // Exit the GC
185                if should_stop {
186                    // If should_stop = true, attempt to stop the plugin
187                    always_stop = true;
188                    break;
189                } else {
190                    // Don't stop the plugin
191                    return;
192                }
193            }
194        }
195
196        // Upon exiting the loop, if the timeout reached zero, or we are exiting due to an Exited
197        // message, stop the plugin
198        if always_stop
199            || self
200                .next_timeout(Instant::now())
201                .is_some_and(|t| t.is_zero())
202        {
203            // We only hold a weak reference, and it's not an error if we fail to upgrade it -
204            // that just means the plugin is definitely stopped anyway.
205            if let Some(plugin) = self.plugin.upgrade() {
206                let name = &self.name;
207                if let Err(err) = plugin.stop() {
208                    log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
209                } else {
210                    log::debug!("Plugin `{name}` successfully stopped by GC");
211                }
212            }
213        }
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    fn test_state() -> PluginGcState {
222        PluginGcState {
223            config: PluginGcConfig::default(),
224            last_update: None,
225            locks: 0,
226            disabled: false,
227            plugin: Weak::new(),
228            name: "test".into(),
229        }
230    }
231
232    #[test]
233    fn timeout_configured_as_zero() {
234        let now = Instant::now();
235        let mut state = test_state();
236        state.config.enabled = true;
237        state.config.stop_after = 0;
238        state.last_update = Some(now);
239
240        assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
241    }
242
243    #[test]
244    fn timeout_past_deadline() {
245        let now = Instant::now();
246        let mut state = test_state();
247        state.config.enabled = true;
248        state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
249        state.last_update = Some(now.checked_sub(Duration::from_secs(2)).unwrap());
250
251        assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
252    }
253
254    #[test]
255    fn timeout_with_deadline_in_future() {
256        let now = Instant::now();
257        let mut state = test_state();
258        state.config.enabled = true;
259        state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
260        state.last_update = Some(now);
261
262        assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
263    }
264
265    #[test]
266    fn no_timeout_if_disabled_by_config() {
267        let now = Instant::now();
268        let mut state = test_state();
269        state.config.enabled = false;
270        state.last_update = Some(now);
271
272        assert_eq!(None, state.next_timeout(now));
273    }
274
275    #[test]
276    fn no_timeout_if_disabled_by_plugin() {
277        let now = Instant::now();
278        let mut state = test_state();
279        state.config.enabled = true;
280        state.disabled = true;
281        state.last_update = Some(now);
282
283        assert_eq!(None, state.next_timeout(now));
284    }
285
286    #[test]
287    fn no_timeout_if_locks_count_over_zero() {
288        let now = Instant::now();
289        let mut state = test_state();
290        state.config.enabled = true;
291        state.locks = 1;
292        state.last_update = Some(now);
293
294        assert_eq!(None, state.next_timeout(now));
295    }
296
297    #[test]
298    fn adding_locks_changes_last_update() {
299        let mut state = test_state();
300        let original_last_update =
301            Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
302        state.last_update = original_last_update;
303        state.handle_message(PluginGcMsg::AddLocks(1));
304        assert_ne!(original_last_update, state.last_update, "not updated");
305    }
306}