1use std::{
2 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3 sync::{
4 Arc, Mutex,
5 mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6 },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use std::time::{Duration, Instant};
11
12use nu_system::{UnfreezeHandle, kill_by_pid};
13
14use crate::{PipelineData, Signals, shell_error};
15
16use crate::JobId;
17
18pub struct Jobs {
19 next_job_id: usize,
20
21 last_frozen_job_id: Option<JobId>,
25 jobs: HashMap<JobId, Job>,
26}
27
28impl Default for Jobs {
29 fn default() -> Self {
30 Self {
31 next_job_id: 1,
32 last_frozen_job_id: None,
33 jobs: HashMap::default(),
34 }
35 }
36}
37
38impl Jobs {
39 pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
40 self.jobs.iter().map(|(k, v)| (*k, v))
41 }
42
43 pub fn lookup(&self, id: JobId) -> Option<&Job> {
44 self.jobs.get(&id)
45 }
46
47 pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
48 self.jobs.get_mut(&id)
49 }
50
51 pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
52 if self.last_frozen_job_id.is_some_and(|last| id == last) {
53 self.last_frozen_job_id = None;
54 }
55
56 self.jobs.remove(&id)
57 }
58
59 fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
60 if let Job::Frozen(_) = job {
61 self.last_frozen_job_id = Some(id);
62 }
63 }
64
65 pub fn add_job(&mut self, job: Job) -> JobId {
66 let this_id = JobId::new(self.next_job_id);
67
68 self.assign_last_frozen_id_if_frozen(this_id, &job);
69
70 self.jobs.insert(this_id, job);
71 self.next_job_id += 1;
72
73 this_id
74 }
75
76 pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
77 self.last_frozen_job_id
78 }
79
80 pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
82 self.assign_last_frozen_id_if_frozen(id, &job);
83
84 if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
85 e.insert(job);
86 Ok(())
87 } else {
88 Err("job already exists")
89 }
90 }
91
92 pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
96 if let Some(job) = self.jobs.get(&id) {
97 let err = job.kill();
98
99 self.remove_job(id);
100
101 err?
102 }
103
104 Ok(())
105 }
106
107 pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
113 self.last_frozen_job_id = None;
114
115 let first_err = self
116 .iter()
117 .map(|(_, job)| job.kill().err())
118 .fold(None, |acc, x| acc.or(x));
119
120 self.jobs.clear();
121
122 if let Some(err) = first_err {
123 Err(err)
124 } else {
125 Ok(())
126 }
127 }
128}
129
130pub enum Job {
131 Thread(ThreadJob),
132 Frozen(FrozenJob),
133}
134
135#[derive(Clone)]
144pub struct ThreadJob {
145 signals: Signals,
146 pids: Arc<Mutex<HashSet<u32>>>,
147 tag: Option<String>,
148 pub sender: Sender<Mail>,
149}
150
151impl ThreadJob {
152 pub fn new(signals: Signals, tag: Option<String>, sender: Sender<Mail>) -> Self {
153 ThreadJob {
154 signals,
155 pids: Arc::new(Mutex::new(HashSet::default())),
156 sender,
157 tag,
158 }
159 }
160
161 pub fn try_add_pid(&self, pid: u32) -> bool {
166 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
167
168 if self.signals.interrupted() {
170 false
171 } else {
172 pids.insert(pid);
173 true
174 }
175 }
176
177 pub fn collect_pids(&self) -> Vec<u32> {
178 let lock = self.pids.lock().expect("PID lock was poisoned");
179
180 lock.iter().copied().collect()
181 }
182
183 pub fn kill(&self) -> shell_error::io::Result<()> {
184 self.signals.trigger();
188
189 let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
190
191 for pid in pids.iter() {
192 kill_by_pid((*pid).into())?;
193 }
194
195 pids.clear();
196
197 Ok(())
198 }
199
200 pub fn remove_pid(&self, pid: u32) {
201 let mut pids = self.pids.lock().expect("PID lock was poisoned");
202
203 pids.remove(&pid);
204 }
205}
206
207impl Job {
208 pub fn kill(&self) -> shell_error::io::Result<()> {
209 match self {
210 Job::Thread(thread_job) => thread_job.kill(),
211 Job::Frozen(frozen_job) => frozen_job.kill(),
212 }
213 }
214
215 pub fn tag(&self) -> Option<&String> {
216 match self {
217 Job::Thread(thread_job) => thread_job.tag.as_ref(),
218 Job::Frozen(frozen_job) => frozen_job.tag.as_ref(),
219 }
220 }
221
222 pub fn assign_tag(&mut self, tag: Option<String>) {
223 match self {
224 Job::Thread(thread_job) => thread_job.tag = tag,
225 Job::Frozen(frozen_job) => frozen_job.tag = tag,
226 }
227 }
228}
229
230pub struct FrozenJob {
231 pub unfreeze: UnfreezeHandle,
232 pub tag: Option<String>,
233}
234
235impl FrozenJob {
236 pub fn kill(&self) -> shell_error::io::Result<()> {
237 #[cfg(unix)]
238 {
239 Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
240 }
241
242 #[cfg(not(unix))]
244 {
245 Ok(())
246 }
247 }
248}
249
250#[derive(Clone)]
252pub struct CurrentJob {
253 pub id: JobId,
254
255 pub background_thread_job: Option<ThreadJob>,
258
259 pub mailbox: Arc<Mutex<Mailbox>>,
262}
263
264pub struct Mailbox {
270 receiver: Receiver<Mail>,
271 ignored_mail: IgnoredMail,
272}
273
274impl Mailbox {
275 pub fn new(receiver: Receiver<Mail>) -> Self {
276 Mailbox {
277 receiver,
278 ignored_mail: IgnoredMail::default(),
279 }
280 }
281
282 #[cfg(not(target_family = "wasm"))]
283 pub fn recv_timeout(
284 &mut self,
285 filter_tag: Option<FilterTag>,
286 timeout: Duration,
287 ) -> Result<PipelineData, RecvTimeoutError> {
288 if let Some(value) = self.ignored_mail.pop(filter_tag) {
289 Ok(value)
290 } else {
291 let mut waited_so_far = Duration::ZERO;
292 let mut before = Instant::now();
293
294 while waited_so_far < timeout {
295 let (tag, value) = self.receiver.recv_timeout(timeout - waited_so_far)?;
296
297 if filter_tag.is_none() || filter_tag == tag {
298 return Ok(value);
299 } else {
300 self.ignored_mail.add((tag, value));
301 let now = Instant::now();
302 waited_so_far += now - before;
303 before = now;
304 }
305 }
306
307 Err(RecvTimeoutError::Timeout)
308 }
309 }
310
311 #[cfg(not(target_family = "wasm"))]
312 pub fn try_recv(
313 &mut self,
314 filter_tag: Option<FilterTag>,
315 ) -> Result<PipelineData, TryRecvError> {
316 if let Some(value) = self.ignored_mail.pop(filter_tag) {
317 Ok(value)
318 } else {
319 loop {
320 let (tag, value) = self.receiver.try_recv()?;
321
322 if filter_tag.is_none() || filter_tag == tag {
323 return Ok(value);
324 } else {
325 self.ignored_mail.add((tag, value));
326 }
327 }
328 }
329 }
330
331 pub fn clear(&mut self) {
332 self.ignored_mail.clear();
333
334 while self.receiver.try_recv().is_ok() {}
335 }
336}
337
338#[derive(Default)]
341struct IgnoredMail {
342 next_id: usize,
343 messages: BTreeMap<usize, Mail>,
344 by_tag: HashMap<FilterTag, BTreeSet<usize>>,
345}
346
347pub type FilterTag = u64;
348pub type Mail = (Option<FilterTag>, PipelineData);
349
350impl IgnoredMail {
351 pub fn add(&mut self, (tag, value): Mail) {
352 let id = self.next_id;
353 self.next_id += 1;
354
355 self.messages.insert(id, (tag, value));
356
357 if let Some(tag) = tag {
358 self.by_tag.entry(tag).or_default().insert(id);
359 }
360 }
361
362 pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
363 if let Some(tag) = tag {
364 self.pop_oldest_with_tag(tag)
365 } else {
366 self.pop_oldest()
367 }
368 }
369
370 pub fn clear(&mut self) {
371 self.messages.clear();
372 self.by_tag.clear();
373 }
374
375 fn pop_oldest(&mut self) -> Option<PipelineData> {
376 let (id, (tag, value)) = self.messages.pop_first()?;
377
378 if let Some(tag) = tag {
379 let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
380 ids.remove(&id);
381 ids.is_empty()
382 } else {
383 false
384 };
385
386 if needs_cleanup {
387 self.by_tag.remove(&tag);
388 }
389 }
390
391 Some(value)
392 }
393
394 fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
395 let ids = self.by_tag.get_mut(&tag)?;
396
397 let id = ids.pop_first()?;
398
399 if ids.is_empty() {
400 self.by_tag.remove(&tag);
401 }
402
403 Some(self.messages.remove(&id)?.1)
404 }
405}