1use std::{
2 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3 sync::{
4 Arc, Mutex,
5 mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6 },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use nu_utils::time::Instant;
11#[cfg(not(target_family = "wasm"))]
12use std::time::Duration;
13
14use nu_system::{UnfreezeHandle, kill_by_pid};
15
16use crate::{PipelineData, Signals, shell_error};
17
18use crate::JobId;
19
20pub struct Jobs {
21 next_job_id: usize,
22
23 last_frozen_job_id: Option<JobId>,
27 jobs: HashMap<JobId, Job>,
28}
29
30impl Default for Jobs {
31 fn default() -> Self {
32 Self {
33 next_job_id: 1,
34 last_frozen_job_id: None,
35 jobs: HashMap::default(),
36 }
37 }
38}
39
40impl Jobs {
41 pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
42 self.jobs.iter().map(|(k, v)| (*k, v))
43 }
44
45 pub fn lookup(&self, id: JobId) -> Option<&Job> {
46 self.jobs.get(&id)
47 }
48
49 pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
50 self.jobs.get_mut(&id)
51 }
52
53 pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
54 if self.last_frozen_job_id.is_some_and(|last| id == last) {
55 self.last_frozen_job_id = None;
56 }
57
58 self.jobs.remove(&id)
59 }
60
61 fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
62 if let Job::Frozen(_) = job {
63 self.last_frozen_job_id = Some(id);
64 }
65 }
66
67 pub fn add_job(&mut self, job: Job) -> JobId {
68 let this_id = JobId::new(self.next_job_id);
69
70 self.assign_last_frozen_id_if_frozen(this_id, &job);
71
72 self.jobs.insert(this_id, job);
73 self.next_job_id += 1;
74
75 this_id
76 }
77
78 pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
79 self.last_frozen_job_id
80 }
81
82 pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
84 self.assign_last_frozen_id_if_frozen(id, &job);
85
86 if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
87 e.insert(job);
88 Ok(())
89 } else {
90 Err("job already exists")
91 }
92 }
93
94 pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
98 if let Some(job) = self.jobs.get(&id) {
99 let err = job.kill();
100
101 self.remove_job(id);
102
103 err?
104 }
105
106 Ok(())
107 }
108
109 pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
115 self.last_frozen_job_id = None;
116
117 let first_err = self
118 .iter()
119 .map(|(_, job)| job.kill().err())
120 .fold(None, |acc, x| acc.or(x));
121
122 self.jobs.clear();
123
124 if let Some(err) = first_err {
125 Err(err)
126 } else {
127 Ok(())
128 }
129 }
130}
131
132pub enum Job {
133 Thread(ThreadJob),
134 Frozen(FrozenJob),
135}
136
137#[derive(Clone)]
146pub struct ThreadJob {
147 signals: Signals,
148 pids: Arc<Mutex<HashSet<u32>>>,
149 description: Option<String>,
150 pub sender: Sender<Mail>,
151}
152
153impl ThreadJob {
154 pub fn new(signals: Signals, description: Option<String>, sender: Sender<Mail>) -> Self {
155 ThreadJob {
156 signals,
157 pids: Arc::new(Mutex::new(HashSet::default())),
158 sender,
159 description,
160 }
161 }
162
163 pub fn try_add_pid(&self, pid: u32) -> bool {
168 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
169
170 if self.signals.interrupted() {
172 false
173 } else {
174 pids.insert(pid);
175 true
176 }
177 }
178
179 pub fn collect_pids(&self) -> Vec<u32> {
180 let lock = self.pids.lock().expect("PID lock was poisoned");
181
182 lock.iter().copied().collect()
183 }
184
185 pub fn kill(&self) -> shell_error::io::Result<()> {
186 self.signals.trigger();
190
191 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
192
193 for pid in pids.iter() {
194 kill_by_pid((*pid).into())?;
195 }
196
197 pids.clear();
198
199 Ok(())
200 }
201
202 pub fn remove_pid(&self, pid: u32) {
203 let mut pids = self.pids.lock().expect("PID lock was poisoned");
204
205 pids.remove(&pid);
206 }
207}
208
209impl Job {
210 pub fn kill(&self) -> shell_error::io::Result<()> {
211 match self {
212 Job::Thread(thread_job) => thread_job.kill(),
213 Job::Frozen(frozen_job) => frozen_job.kill(),
214 }
215 }
216
217 pub fn description(&self) -> Option<&String> {
218 match self {
219 Job::Thread(thread_job) => thread_job.description.as_ref(),
220 Job::Frozen(frozen_job) => frozen_job.description.as_ref(),
221 }
222 }
223
224 pub fn assign_description(&mut self, description: Option<String>) {
225 match self {
226 Job::Thread(thread_job) => thread_job.description = description,
227 Job::Frozen(frozen_job) => frozen_job.description = description,
228 }
229 }
230}
231
232pub struct FrozenJob {
233 pub unfreeze: UnfreezeHandle,
234 pub description: Option<String>,
235}
236
237impl FrozenJob {
238 pub fn kill(&self) -> shell_error::io::Result<()> {
239 #[cfg(unix)]
240 {
241 Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
242 }
243
244 #[cfg(not(unix))]
246 {
247 Ok(())
248 }
249 }
250}
251
252#[derive(Clone)]
254pub struct CurrentJob {
255 pub id: JobId,
256
257 pub background_thread_job: Option<ThreadJob>,
260
261 pub mailbox: Arc<Mutex<Mailbox>>,
264}
265
266pub struct Mailbox {
272 receiver: Receiver<Mail>,
273 ignored_mail: IgnoredMail,
274}
275
276impl Mailbox {
277 pub fn new(receiver: Receiver<Mail>) -> Self {
278 Mailbox {
279 receiver,
280 ignored_mail: IgnoredMail::default(),
281 }
282 }
283
284 #[cfg(not(target_family = "wasm"))]
285 pub fn recv_timeout(
286 &mut self,
287 filter_tag: Option<FilterTag>,
288 timeout: Duration,
289 ) -> Result<PipelineData, RecvTimeoutError> {
290 if let Some(value) = self.ignored_mail.pop(filter_tag) {
291 Ok(value)
292 } else {
293 let mut waited_so_far = Duration::ZERO;
294 let mut before = Instant::now();
295
296 while waited_so_far < timeout {
297 let (tag, value) = self
298 .receiver
299 .recv_timeout(timeout.checked_sub(waited_so_far).unwrap_or(Duration::ZERO))?;
300
301 if filter_tag.is_none() || filter_tag == tag {
302 return Ok(value);
303 } else {
304 self.ignored_mail.add((tag, value));
305 let now = Instant::now();
306 waited_so_far += now - before;
307 before = now;
308 }
309 }
310
311 Err(RecvTimeoutError::Timeout)
312 }
313 }
314
315 #[cfg(not(target_family = "wasm"))]
316 pub fn try_recv(
317 &mut self,
318 filter_tag: Option<FilterTag>,
319 ) -> Result<PipelineData, TryRecvError> {
320 if let Some(value) = self.ignored_mail.pop(filter_tag) {
321 Ok(value)
322 } else {
323 loop {
324 let (tag, value) = self.receiver.try_recv()?;
325
326 if filter_tag.is_none() || filter_tag == tag {
327 return Ok(value);
328 } else {
329 self.ignored_mail.add((tag, value));
330 }
331 }
332 }
333 }
334
335 pub fn clear(&mut self) {
336 self.ignored_mail.clear();
337
338 while self.receiver.try_recv().is_ok() {}
339 }
340}
341
342#[derive(Default)]
345struct IgnoredMail {
346 next_id: usize,
347 messages: BTreeMap<usize, Mail>,
348 by_tag: HashMap<FilterTag, BTreeSet<usize>>,
349}
350
351pub type FilterTag = u64;
352pub type Mail = (Option<FilterTag>, PipelineData);
353
354impl IgnoredMail {
355 pub fn add(&mut self, (tag, value): Mail) {
356 let id = self.next_id;
357 self.next_id += 1;
358
359 self.messages.insert(id, (tag, value));
360
361 if let Some(tag) = tag {
362 self.by_tag.entry(tag).or_default().insert(id);
363 }
364 }
365
366 pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
367 if let Some(tag) = tag {
368 self.pop_oldest_with_tag(tag)
369 } else {
370 self.pop_oldest()
371 }
372 }
373
374 pub fn clear(&mut self) {
375 self.messages.clear();
376 self.by_tag.clear();
377 }
378
379 fn pop_oldest(&mut self) -> Option<PipelineData> {
380 let (id, (tag, value)) = self.messages.pop_first()?;
381
382 if let Some(tag) = tag {
383 let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
384 ids.remove(&id);
385 ids.is_empty()
386 } else {
387 false
388 };
389
390 if needs_cleanup {
391 self.by_tag.remove(&tag);
392 }
393 }
394
395 Some(value)
396 }
397
398 fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
399 let ids = self.by_tag.get_mut(&tag)?;
400
401 let id = ids.pop_first()?;
402
403 if ids.is_empty() {
404 self.by_tag.remove(&tag);
405 }
406
407 Some(self.messages.remove(&id)?.1)
408 }
409}