1use crate::PersistentPlugin;
2use nu_protocol::{PluginGcConfig, RegisteredPlugin};
3use std::{
4 sync::{Arc, Weak, mpsc},
5 thread,
6 time::Duration,
7};
8
9use nu_utils::time::Instant;
10
11#[derive(Debug, Clone)]
17pub struct PluginGc {
18 sender: mpsc::Sender<PluginGcMsg>,
19}
20
21impl PluginGc {
22 pub fn new(
24 config: PluginGcConfig,
25 plugin: &Arc<PersistentPlugin>,
26 ) -> std::io::Result<PluginGc> {
27 let (sender, receiver) = mpsc::channel();
28
29 let mut state = PluginGcState {
30 config,
31 last_update: None,
32 locks: 0,
33 disabled: false,
34 plugin: Arc::downgrade(plugin),
35 name: plugin.identity().name().to_owned(),
36 };
37
38 thread::Builder::new()
39 .name(format!("plugin gc ({})", plugin.identity().name()))
40 .spawn(move || state.run(receiver))?;
41
42 Ok(PluginGc { sender })
43 }
44
45 pub fn set_config(&self, config: PluginGcConfig) {
47 let _ = self.sender.send(PluginGcMsg::SetConfig(config));
48 }
49
50 pub fn flush(&self) {
52 let (tx, rx) = mpsc::channel();
53 let _ = self.sender.send(PluginGcMsg::Flush(tx));
54 let _ = rx.recv();
57 }
58
59 pub fn increment_locks(&self, amount: i64) {
61 let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
62 }
63
64 pub fn decrement_locks(&self, amount: i64) {
66 let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
67 }
68
69 pub fn set_disabled(&self, disabled: bool) {
72 let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
73 }
74
75 pub fn stop_tracking(&self) {
78 let _ = self.sender.send(PluginGcMsg::StopTracking);
79 }
80
81 pub fn exited(&self) {
88 let _ = self.sender.send(PluginGcMsg::Exited);
89 }
90}
91
92#[derive(Debug)]
93enum PluginGcMsg {
94 SetConfig(PluginGcConfig),
95 Flush(mpsc::Sender<()>),
96 AddLocks(i64),
97 SetDisabled(bool),
98 StopTracking,
99 Exited,
100}
101
102#[derive(Debug)]
103struct PluginGcState {
104 config: PluginGcConfig,
105 last_update: Option<Instant>,
106 locks: i64,
107 disabled: bool,
108 plugin: Weak<PersistentPlugin>,
109 name: String,
110}
111
112impl PluginGcState {
113 fn next_timeout(&self, now: Instant) -> Option<Duration> {
114 if self.locks <= 0 && !self.disabled {
115 self.last_update
116 .zip(self.config.enabled.then_some(self.config.stop_after))
117 .map(|(last_update, stop_after)| {
118 let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
120 let duration_since_last_update = now.duration_since(last_update);
121 stop_after_duration.saturating_sub(duration_since_last_update)
122 })
123 } else {
124 None
126 }
127 }
128
129 fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
132 match msg {
133 PluginGcMsg::SetConfig(config) => {
134 self.config = config;
135 }
136 PluginGcMsg::Flush(sender) => {
137 drop(sender);
140 }
141 PluginGcMsg::AddLocks(amount) => {
142 self.locks += amount;
143 if self.locks < 0 {
144 log::warn!(
145 "Plugin GC ({name}) problem: locks count below zero after adding \
146 {amount}: locks={locks}",
147 name = self.name,
148 locks = self.locks,
149 );
150 }
151 self.last_update = Some(Instant::now());
153 }
154 PluginGcMsg::SetDisabled(disabled) => {
155 self.disabled = disabled;
156 }
157 PluginGcMsg::StopTracking => {
158 return Some(false);
160 }
161 PluginGcMsg::Exited => {
162 return Some(true);
164 }
165 }
166 None
167 }
168
169 fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
170 let mut always_stop = false;
171
172 loop {
173 let Some(msg) = (match self.next_timeout(Instant::now()) {
174 Some(duration) => receiver.recv_timeout(duration).ok(),
175 None => receiver.recv().ok(),
176 }) else {
177 break;
179 };
180
181 log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
182
183 if let Some(should_stop) = self.handle_message(msg) {
184 if should_stop {
186 always_stop = true;
188 break;
189 } else {
190 return;
192 }
193 }
194 }
195
196 if always_stop
199 || self
200 .next_timeout(Instant::now())
201 .is_some_and(|t| t.is_zero())
202 {
203 if let Some(plugin) = self.plugin.upgrade() {
206 let name = &self.name;
207 if let Err(err) = plugin.stop() {
208 log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
209 } else {
210 log::debug!("Plugin `{name}` successfully stopped by GC");
211 }
212 }
213 }
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 fn test_state() -> PluginGcState {
222 PluginGcState {
223 config: PluginGcConfig::default(),
224 last_update: None,
225 locks: 0,
226 disabled: false,
227 plugin: Weak::new(),
228 name: "test".into(),
229 }
230 }
231
232 #[test]
233 fn timeout_configured_as_zero() {
234 let now = Instant::now();
235 let mut state = test_state();
236 state.config.enabled = true;
237 state.config.stop_after = 0;
238 state.last_update = Some(now);
239
240 assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
241 }
242
243 #[test]
244 fn timeout_past_deadline() {
245 let now = Instant::now();
246 let mut state = test_state();
247 state.config.enabled = true;
248 state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
249 state.last_update = Some(now.checked_sub(Duration::from_secs(2)).unwrap());
250
251 assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
252 }
253
254 #[test]
255 fn timeout_with_deadline_in_future() {
256 let now = Instant::now();
257 let mut state = test_state();
258 state.config.enabled = true;
259 state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
260 state.last_update = Some(now);
261
262 assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
263 }
264
265 #[test]
266 fn no_timeout_if_disabled_by_config() {
267 let now = Instant::now();
268 let mut state = test_state();
269 state.config.enabled = false;
270 state.last_update = Some(now);
271
272 assert_eq!(None, state.next_timeout(now));
273 }
274
275 #[test]
276 fn no_timeout_if_disabled_by_plugin() {
277 let now = Instant::now();
278 let mut state = test_state();
279 state.config.enabled = true;
280 state.disabled = true;
281 state.last_update = Some(now);
282
283 assert_eq!(None, state.next_timeout(now));
284 }
285
286 #[test]
287 fn no_timeout_if_locks_count_over_zero() {
288 let now = Instant::now();
289 let mut state = test_state();
290 state.config.enabled = true;
291 state.locks = 1;
292 state.last_update = Some(now);
293
294 assert_eq!(None, state.next_timeout(now));
295 }
296
297 #[test]
298 fn adding_locks_changes_last_update() {
299 let mut state = test_state();
300 let original_last_update =
301 Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
302 state.last_update = original_last_update;
303 state.handle_message(PluginGcMsg::AddLocks(1));
304 assert_ne!(original_last_update, state.last_update, "not updated");
305 }
306}