1use std::{
2 collections::{BTreeMap, BTreeSet, HashMap, HashSet, hash_map::Entry},
3 sync::{
4 Arc, Mutex,
5 mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6 },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use nu_utils::time::Instant;
11#[cfg(not(target_family = "wasm"))]
12use std::time::Duration;
13
14use nu_system::{UnfreezeHandle, kill_by_pid};
15
16use crate::{PipelineData, Signals, shell_error};
17
18use crate::JobId;
19
20#[derive(Debug)]
21pub struct Jobs {
22 next_job_id: usize,
23
24 last_frozen_job_id: Option<JobId>,
28 jobs: HashMap<JobId, Job>,
29}
30
31impl Default for Jobs {
32 fn default() -> Self {
33 Self {
34 next_job_id: 1,
35 last_frozen_job_id: None,
36 jobs: HashMap::default(),
37 }
38 }
39}
40
41impl Jobs {
42 pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
43 self.jobs.iter().map(|(k, v)| (*k, v))
44 }
45
46 pub fn lookup(&self, id: JobId) -> Option<&Job> {
47 self.jobs.get(&id)
48 }
49
50 pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
51 self.jobs.get_mut(&id)
52 }
53
54 pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
55 if self.last_frozen_job_id.is_some_and(|last| id == last) {
56 self.last_frozen_job_id = None;
57 }
58
59 self.jobs.remove(&id)
60 }
61
62 fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
63 if let Job::Frozen(_) = job {
64 self.last_frozen_job_id = Some(id);
65 }
66 }
67
68 pub fn add_job(&mut self, job: Job) -> JobId {
69 let this_id = JobId::new(self.next_job_id);
70
71 self.assign_last_frozen_id_if_frozen(this_id, &job);
72
73 self.jobs.insert(this_id, job);
74 self.next_job_id += 1;
75
76 this_id
77 }
78
79 pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
80 self.last_frozen_job_id
81 }
82
83 pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
85 self.assign_last_frozen_id_if_frozen(id, &job);
86
87 if let Entry::Vacant(e) = self.jobs.entry(id) {
88 e.insert(job);
89 Ok(())
90 } else {
91 Err("job already exists")
92 }
93 }
94
95 pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
99 if let Some(job) = self.jobs.get(&id) {
100 let err = job.kill();
101
102 self.remove_job(id);
103
104 err?
105 }
106
107 Ok(())
108 }
109
110 pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
116 self.last_frozen_job_id = None;
117
118 let first_err = self
119 .iter()
120 .map(|(_, job)| job.kill().err())
121 .fold(None, |acc, x| acc.or(x));
122
123 self.jobs.clear();
124
125 if let Some(err) = first_err {
126 Err(err)
127 } else {
128 Ok(())
129 }
130 }
131}
132
133#[derive(Debug)]
134pub enum Job {
135 Thread(ThreadJob),
136 Frozen(FrozenJob),
137}
138
139#[derive(Clone, Debug)]
148pub struct ThreadJob {
149 signals: Signals,
150 pids: Arc<Mutex<HashSet<u32>>>,
151 description: Option<String>,
152 pub sender: Sender<Mail>,
153}
154
155impl ThreadJob {
156 pub fn new(signals: Signals, description: Option<String>, sender: Sender<Mail>) -> Self {
157 ThreadJob {
158 signals,
159 pids: Arc::new(Mutex::new(HashSet::default())),
160 sender,
161 description,
162 }
163 }
164
165 pub fn try_add_pid(&self, pid: u32) -> bool {
170 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
171
172 if self.signals.interrupted() {
174 false
175 } else {
176 pids.insert(pid);
177 true
178 }
179 }
180
181 pub fn collect_pids(&self) -> Vec<u32> {
182 let lock = self.pids.lock().expect("PID lock was poisoned");
183
184 lock.iter().copied().collect()
185 }
186
187 pub fn kill(&self) -> shell_error::io::Result<()> {
188 self.signals.trigger();
192
193 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
194
195 for pid in pids.iter() {
196 kill_by_pid((*pid).into())?;
197 }
198
199 pids.clear();
200
201 Ok(())
202 }
203
204 pub fn remove_pid(&self, pid: u32) {
205 let mut pids = self.pids.lock().expect("PID lock was poisoned");
206
207 pids.remove(&pid);
208 }
209}
210
211impl Job {
212 pub fn kill(&self) -> shell_error::io::Result<()> {
213 match self {
214 Job::Thread(thread_job) => thread_job.kill(),
215 Job::Frozen(frozen_job) => frozen_job.kill(),
216 }
217 }
218
219 pub fn description(&self) -> Option<&String> {
220 match self {
221 Job::Thread(thread_job) => thread_job.description.as_ref(),
222 Job::Frozen(frozen_job) => frozen_job.description.as_ref(),
223 }
224 }
225
226 pub fn assign_description(&mut self, description: Option<String>) {
227 match self {
228 Job::Thread(thread_job) => thread_job.description = description,
229 Job::Frozen(frozen_job) => frozen_job.description = description,
230 }
231 }
232}
233
234#[derive(Debug)]
235pub struct FrozenJob {
236 pub unfreeze: UnfreezeHandle,
237 pub description: Option<String>,
238}
239
240impl FrozenJob {
241 pub fn kill(&self) -> shell_error::io::Result<()> {
242 #[cfg(unix)]
243 {
244 Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
245 }
246
247 #[cfg(not(unix))]
249 {
250 Ok(())
251 }
252 }
253}
254
255#[derive(Clone, Debug)]
257pub struct CurrentJob {
258 pub id: JobId,
259
260 pub background_thread_job: Option<ThreadJob>,
263
264 pub mailbox: Arc<Mutex<Mailbox>>,
267}
268
269#[derive(Debug)]
275pub struct Mailbox {
276 receiver: Receiver<Mail>,
277 ignored_mail: IgnoredMail,
278}
279
280impl Mailbox {
281 pub fn new(receiver: Receiver<Mail>) -> Self {
282 Mailbox {
283 receiver,
284 ignored_mail: IgnoredMail::default(),
285 }
286 }
287
288 #[cfg(not(target_family = "wasm"))]
289 pub fn recv_timeout(
290 &mut self,
291 filter_tag: Option<FilterTag>,
292 timeout: Duration,
293 ) -> Result<PipelineData, RecvTimeoutError> {
294 if let Some(value) = self.ignored_mail.pop(filter_tag) {
295 Ok(value)
296 } else {
297 let mut waited_so_far = Duration::ZERO;
298 let mut before = Instant::now();
299
300 while waited_so_far < timeout {
301 let (tag, value) = self
302 .receiver
303 .recv_timeout(timeout.checked_sub(waited_so_far).unwrap_or(Duration::ZERO))?;
304
305 if filter_tag.is_none() || filter_tag == tag {
306 return Ok(value);
307 } else {
308 self.ignored_mail.add((tag, value));
309 let now = Instant::now();
310 waited_so_far += now - before;
311 before = now;
312 }
313 }
314
315 Err(RecvTimeoutError::Timeout)
316 }
317 }
318
319 #[cfg(not(target_family = "wasm"))]
320 pub fn try_recv(
321 &mut self,
322 filter_tag: Option<FilterTag>,
323 ) -> Result<PipelineData, TryRecvError> {
324 if let Some(value) = self.ignored_mail.pop(filter_tag) {
325 Ok(value)
326 } else {
327 loop {
328 let (tag, value) = self.receiver.try_recv()?;
329
330 if filter_tag.is_none() || filter_tag == tag {
331 return Ok(value);
332 } else {
333 self.ignored_mail.add((tag, value));
334 }
335 }
336 }
337 }
338
339 pub fn clear(&mut self) {
340 self.ignored_mail.clear();
341
342 while self.receiver.try_recv().is_ok() {}
343 }
344}
345
346#[derive(Default, Debug)]
349struct IgnoredMail {
350 next_id: usize,
351 messages: BTreeMap<usize, Mail>,
352 by_tag: HashMap<FilterTag, BTreeSet<usize>>,
353}
354
355pub type FilterTag = u64;
356pub type Mail = (Option<FilterTag>, PipelineData);
357
358impl IgnoredMail {
359 pub fn add(&mut self, (tag, value): Mail) {
360 let id = self.next_id;
361 self.next_id += 1;
362
363 self.messages.insert(id, (tag, value));
364
365 if let Some(tag) = tag {
366 self.by_tag.entry(tag).or_default().insert(id);
367 }
368 }
369
370 pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
371 if let Some(tag) = tag {
372 self.pop_oldest_with_tag(tag)
373 } else {
374 self.pop_oldest()
375 }
376 }
377
378 pub fn clear(&mut self) {
379 self.messages.clear();
380 self.by_tag.clear();
381 }
382
383 fn pop_oldest(&mut self) -> Option<PipelineData> {
384 let (id, (tag, value)) = self.messages.pop_first()?;
385
386 if let Some(tag) = tag
387 && let Entry::Occupied(mut occupied_entry) = self.by_tag.entry(tag)
388 {
389 occupied_entry.get_mut().remove(&id);
390 if occupied_entry.get().is_empty() {
391 occupied_entry.remove();
392 }
393 }
394
395 Some(value)
396 }
397
398 fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
399 let ids = self.by_tag.get_mut(&tag)?;
400
401 let id = ids.pop_first()?;
402
403 if ids.is_empty() {
404 self.by_tag.remove(&tag);
405 }
406
407 Some(self.messages.remove(&id)?.1)
408 }
409}