1use crate::PersistentPlugin;
2use nu_protocol::{PluginGcConfig, RegisteredPlugin};
3use std::{
4 sync::{Arc, Weak, mpsc},
5 thread,
6 time::{Duration, Instant},
7};
8
9#[derive(Debug, Clone)]
15pub struct PluginGc {
16 sender: mpsc::Sender<PluginGcMsg>,
17}
18
19impl PluginGc {
20 pub fn new(
22 config: PluginGcConfig,
23 plugin: &Arc<PersistentPlugin>,
24 ) -> std::io::Result<PluginGc> {
25 let (sender, receiver) = mpsc::channel();
26
27 let mut state = PluginGcState {
28 config,
29 last_update: None,
30 locks: 0,
31 disabled: false,
32 plugin: Arc::downgrade(plugin),
33 name: plugin.identity().name().to_owned(),
34 };
35
36 thread::Builder::new()
37 .name(format!("plugin gc ({})", plugin.identity().name()))
38 .spawn(move || state.run(receiver))?;
39
40 Ok(PluginGc { sender })
41 }
42
43 pub fn set_config(&self, config: PluginGcConfig) {
45 let _ = self.sender.send(PluginGcMsg::SetConfig(config));
46 }
47
48 pub fn flush(&self) {
50 let (tx, rx) = mpsc::channel();
51 let _ = self.sender.send(PluginGcMsg::Flush(tx));
52 let _ = rx.recv();
55 }
56
57 pub fn increment_locks(&self, amount: i64) {
59 let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
60 }
61
62 pub fn decrement_locks(&self, amount: i64) {
64 let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
65 }
66
67 pub fn set_disabled(&self, disabled: bool) {
70 let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
71 }
72
73 pub fn stop_tracking(&self) {
76 let _ = self.sender.send(PluginGcMsg::StopTracking);
77 }
78
79 pub fn exited(&self) {
86 let _ = self.sender.send(PluginGcMsg::Exited);
87 }
88}
89
90#[derive(Debug)]
91enum PluginGcMsg {
92 SetConfig(PluginGcConfig),
93 Flush(mpsc::Sender<()>),
94 AddLocks(i64),
95 SetDisabled(bool),
96 StopTracking,
97 Exited,
98}
99
100#[derive(Debug)]
101struct PluginGcState {
102 config: PluginGcConfig,
103 last_update: Option<Instant>,
104 locks: i64,
105 disabled: bool,
106 plugin: Weak<PersistentPlugin>,
107 name: String,
108}
109
110impl PluginGcState {
111 fn next_timeout(&self, now: Instant) -> Option<Duration> {
112 if self.locks <= 0 && !self.disabled {
113 self.last_update
114 .zip(self.config.enabled.then_some(self.config.stop_after))
115 .map(|(last_update, stop_after)| {
116 let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
118 let duration_since_last_update = now.duration_since(last_update);
119 stop_after_duration.saturating_sub(duration_since_last_update)
120 })
121 } else {
122 None
124 }
125 }
126
127 fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
130 match msg {
131 PluginGcMsg::SetConfig(config) => {
132 self.config = config;
133 }
134 PluginGcMsg::Flush(sender) => {
135 drop(sender);
138 }
139 PluginGcMsg::AddLocks(amount) => {
140 self.locks += amount;
141 if self.locks < 0 {
142 log::warn!(
143 "Plugin GC ({name}) problem: locks count below zero after adding \
144 {amount}: locks={locks}",
145 name = self.name,
146 locks = self.locks,
147 );
148 }
149 self.last_update = Some(Instant::now());
151 }
152 PluginGcMsg::SetDisabled(disabled) => {
153 self.disabled = disabled;
154 }
155 PluginGcMsg::StopTracking => {
156 return Some(false);
158 }
159 PluginGcMsg::Exited => {
160 return Some(true);
162 }
163 }
164 None
165 }
166
167 fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
168 let mut always_stop = false;
169
170 loop {
171 let Some(msg) = (match self.next_timeout(Instant::now()) {
172 Some(duration) => receiver.recv_timeout(duration).ok(),
173 None => receiver.recv().ok(),
174 }) else {
175 break;
177 };
178
179 log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
180
181 if let Some(should_stop) = self.handle_message(msg) {
182 if should_stop {
184 always_stop = true;
186 break;
187 } else {
188 return;
190 }
191 }
192 }
193
194 if always_stop
197 || self
198 .next_timeout(Instant::now())
199 .is_some_and(|t| t.is_zero())
200 {
201 if let Some(plugin) = self.plugin.upgrade() {
204 let name = &self.name;
205 if let Err(err) = plugin.stop() {
206 log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
207 } else {
208 log::debug!("Plugin `{name}` successfully stopped by GC");
209 }
210 }
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 fn test_state() -> PluginGcState {
220 PluginGcState {
221 config: PluginGcConfig::default(),
222 last_update: None,
223 locks: 0,
224 disabled: false,
225 plugin: Weak::new(),
226 name: "test".into(),
227 }
228 }
229
230 #[test]
231 fn timeout_configured_as_zero() {
232 let now = Instant::now();
233 let mut state = test_state();
234 state.config.enabled = true;
235 state.config.stop_after = 0;
236 state.last_update = Some(now);
237
238 assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
239 }
240
241 #[test]
242 fn timeout_past_deadline() {
243 let now = Instant::now();
244 let mut state = test_state();
245 state.config.enabled = true;
246 state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
247 state.last_update = Some(now.checked_sub(Duration::from_secs(2)).unwrap());
248
249 assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
250 }
251
252 #[test]
253 fn timeout_with_deadline_in_future() {
254 let now = Instant::now();
255 let mut state = test_state();
256 state.config.enabled = true;
257 state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
258 state.last_update = Some(now);
259
260 assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
261 }
262
263 #[test]
264 fn no_timeout_if_disabled_by_config() {
265 let now = Instant::now();
266 let mut state = test_state();
267 state.config.enabled = false;
268 state.last_update = Some(now);
269
270 assert_eq!(None, state.next_timeout(now));
271 }
272
273 #[test]
274 fn no_timeout_if_disabled_by_plugin() {
275 let now = Instant::now();
276 let mut state = test_state();
277 state.config.enabled = true;
278 state.disabled = true;
279 state.last_update = Some(now);
280
281 assert_eq!(None, state.next_timeout(now));
282 }
283
284 #[test]
285 fn no_timeout_if_locks_count_over_zero() {
286 let now = Instant::now();
287 let mut state = test_state();
288 state.config.enabled = true;
289 state.locks = 1;
290 state.last_update = Some(now);
291
292 assert_eq!(None, state.next_timeout(now));
293 }
294
295 #[test]
296 fn adding_locks_changes_last_update() {
297 let mut state = test_state();
298 let original_last_update =
299 Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
300 state.last_update = original_last_update;
301 state.handle_message(PluginGcMsg::AddLocks(1));
302 assert_ne!(original_last_update, state.last_update, "not updated");
303 }
304}