nu_protocol/engine/
jobs.rs

1use std::{
2    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3    sync::{
4        Arc, Mutex,
5        mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6    },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use std::time::{Duration, Instant};
11
12use nu_system::{UnfreezeHandle, kill_by_pid};
13
14use crate::{PipelineData, Signals, shell_error};
15
16use crate::JobId;
17
18pub struct Jobs {
19    next_job_id: usize,
20
21    // this is the ID of the most recently added frozen job in the jobs table.
22    // the methods of this struct must ensure the invariant of this always
23    // being None or pointing to a valid job in the table
24    last_frozen_job_id: Option<JobId>,
25    jobs: HashMap<JobId, Job>,
26}
27
28impl Default for Jobs {
29    fn default() -> Self {
30        Self {
31            next_job_id: 1,
32            last_frozen_job_id: None,
33            jobs: HashMap::default(),
34        }
35    }
36}
37
38impl Jobs {
39    pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
40        self.jobs.iter().map(|(k, v)| (*k, v))
41    }
42
43    pub fn lookup(&self, id: JobId) -> Option<&Job> {
44        self.jobs.get(&id)
45    }
46
47    pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
48        self.jobs.get_mut(&id)
49    }
50
51    pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
52        if self.last_frozen_job_id.is_some_and(|last| id == last) {
53            self.last_frozen_job_id = None;
54        }
55
56        self.jobs.remove(&id)
57    }
58
59    fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
60        if let Job::Frozen(_) = job {
61            self.last_frozen_job_id = Some(id);
62        }
63    }
64
65    pub fn add_job(&mut self, job: Job) -> JobId {
66        let this_id = JobId::new(self.next_job_id);
67
68        self.assign_last_frozen_id_if_frozen(this_id, &job);
69
70        self.jobs.insert(this_id, job);
71        self.next_job_id += 1;
72
73        this_id
74    }
75
76    pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
77        self.last_frozen_job_id
78    }
79
80    // this is useful when you want to remove a job from the list and add it back later
81    pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
82        self.assign_last_frozen_id_if_frozen(id, &job);
83
84        if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
85            e.insert(job);
86            Ok(())
87        } else {
88            Err("job already exists")
89        }
90    }
91
92    /// This function tries to forcefully kill a job from this job table,
93    /// removes it from the job table. It always succeeds in removing the job
94    /// from the table, but may fail in killing the job's active processes.
95    pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
96        if let Some(job) = self.jobs.get(&id) {
97            let err = job.kill();
98
99            self.remove_job(id);
100
101            err?
102        }
103
104        Ok(())
105    }
106
107    /// This function tries to forcefully kill all the background jobs and
108    /// removes all of them from the job table.
109    ///
110    /// It returns an error if any of the job killing attempts fails, but always
111    /// succeeds in removing the jobs from the table.
112    pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
113        self.last_frozen_job_id = None;
114
115        let first_err = self
116            .iter()
117            .map(|(_, job)| job.kill().err())
118            .fold(None, |acc, x| acc.or(x));
119
120        self.jobs.clear();
121
122        if let Some(err) = first_err {
123            Err(err)
124        } else {
125            Ok(())
126        }
127    }
128}
129
130pub enum Job {
131    Thread(ThreadJob),
132    Frozen(FrozenJob),
133}
134
135// A thread job represents a job that is currently executing as a background thread in nushell.
136// This is an Arc-y type, cloning it does not uniquely clone the information of this particular
137// job.
138
139// Although rust's documentation does not document the acquire-release semantics of Mutex, this
140// is a direct undocumentented requirement of its soundness, and is thus assumed by this
141// implementaation.
142// see issue https://github.com/rust-lang/rust/issues/126239.
143#[derive(Clone)]
144pub struct ThreadJob {
145    signals: Signals,
146    pids: Arc<Mutex<HashSet<u32>>>,
147    tag: Option<String>,
148    pub sender: Sender<Mail>,
149}
150
151impl ThreadJob {
152    pub fn new(signals: Signals, tag: Option<String>, sender: Sender<Mail>) -> Self {
153        ThreadJob {
154            signals,
155            pids: Arc::new(Mutex::new(HashSet::default())),
156            sender,
157            tag,
158        }
159    }
160
161    /// Tries to add the provided pid to the active pid set of the current job.
162    ///
163    /// Returns true if the pid was added successfully, or false if the
164    /// current job is interrupted.
165    pub fn try_add_pid(&self, pid: u32) -> bool {
166        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
167
168        // note: this signals check must occur after the pids lock has been locked.
169        if self.signals.interrupted() {
170            false
171        } else {
172            pids.insert(pid);
173            true
174        }
175    }
176
177    pub fn collect_pids(&self) -> Vec<u32> {
178        let lock = self.pids.lock().expect("PID lock was poisoned");
179
180        lock.iter().copied().collect()
181    }
182
183    pub fn kill(&self) -> shell_error::io::Result<()> {
184        // it's okay to make this interrupt outside of the mutex, since it has acquire-release
185        // semantics.
186
187        self.signals.trigger();
188
189        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
190
191        for pid in pids.iter() {
192            kill_by_pid((*pid).into())?;
193        }
194
195        pids.clear();
196
197        Ok(())
198    }
199
200    pub fn remove_pid(&self, pid: u32) {
201        let mut pids = self.pids.lock().expect("PID lock was poisoned");
202
203        pids.remove(&pid);
204    }
205}
206
207impl Job {
208    pub fn kill(&self) -> shell_error::io::Result<()> {
209        match self {
210            Job::Thread(thread_job) => thread_job.kill(),
211            Job::Frozen(frozen_job) => frozen_job.kill(),
212        }
213    }
214
215    pub fn tag(&self) -> Option<&String> {
216        match self {
217            Job::Thread(thread_job) => thread_job.tag.as_ref(),
218            Job::Frozen(frozen_job) => frozen_job.tag.as_ref(),
219        }
220    }
221
222    pub fn assign_tag(&mut self, tag: Option<String>) {
223        match self {
224            Job::Thread(thread_job) => thread_job.tag = tag,
225            Job::Frozen(frozen_job) => frozen_job.tag = tag,
226        }
227    }
228}
229
230pub struct FrozenJob {
231    pub unfreeze: UnfreezeHandle,
232    pub tag: Option<String>,
233}
234
235impl FrozenJob {
236    pub fn kill(&self) -> shell_error::io::Result<()> {
237        #[cfg(unix)]
238        {
239            Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
240        }
241
242        // it doesn't happen outside unix.
243        #[cfg(not(unix))]
244        {
245            Ok(())
246        }
247    }
248}
249
250/// Stores the information about the background job currently being executed by this thread, if any
251#[derive(Clone)]
252pub struct CurrentJob {
253    pub id: JobId,
254
255    // The background thread job associated with this thread.
256    // If None, it indicates this thread is currently the main job
257    pub background_thread_job: Option<ThreadJob>,
258
259    // note: although the mailbox is Mutex'd, it is only ever accessed
260    // by the current job's threads
261    pub mailbox: Arc<Mutex<Mailbox>>,
262}
263
264// The storage for unread messages
265//
266// Messages are initially sent over a mpsc channel,
267// and may then be stored in a IgnoredMail struct when
268// filtered out by a tag.
269pub struct Mailbox {
270    receiver: Receiver<Mail>,
271    ignored_mail: IgnoredMail,
272}
273
274impl Mailbox {
275    pub fn new(receiver: Receiver<Mail>) -> Self {
276        Mailbox {
277            receiver,
278            ignored_mail: IgnoredMail::default(),
279        }
280    }
281
282    #[cfg(not(target_family = "wasm"))]
283    pub fn recv_timeout(
284        &mut self,
285        filter_tag: Option<FilterTag>,
286        timeout: Duration,
287    ) -> Result<PipelineData, RecvTimeoutError> {
288        if let Some(value) = self.ignored_mail.pop(filter_tag) {
289            Ok(value)
290        } else {
291            let mut waited_so_far = Duration::ZERO;
292            let mut before = Instant::now();
293
294            while waited_so_far < timeout {
295                let (tag, value) = self.receiver.recv_timeout(timeout - waited_so_far)?;
296
297                if filter_tag.is_none() || filter_tag == tag {
298                    return Ok(value);
299                } else {
300                    self.ignored_mail.add((tag, value));
301                    let now = Instant::now();
302                    waited_so_far += now - before;
303                    before = now;
304                }
305            }
306
307            Err(RecvTimeoutError::Timeout)
308        }
309    }
310
311    #[cfg(not(target_family = "wasm"))]
312    pub fn try_recv(
313        &mut self,
314        filter_tag: Option<FilterTag>,
315    ) -> Result<PipelineData, TryRecvError> {
316        if let Some(value) = self.ignored_mail.pop(filter_tag) {
317            Ok(value)
318        } else {
319            loop {
320                let (tag, value) = self.receiver.try_recv()?;
321
322                if filter_tag.is_none() || filter_tag == tag {
323                    return Ok(value);
324                } else {
325                    self.ignored_mail.add((tag, value));
326                }
327            }
328        }
329    }
330
331    pub fn clear(&mut self) {
332        self.ignored_mail.clear();
333
334        while self.receiver.try_recv().is_ok() {}
335    }
336}
337
338// A data structure used to store messages which were received, but currently ignored by a tag filter
339// messages are added and popped in a first-in-first-out matter.
340#[derive(Default)]
341struct IgnoredMail {
342    next_id: usize,
343    messages: BTreeMap<usize, Mail>,
344    by_tag: HashMap<FilterTag, BTreeSet<usize>>,
345}
346
347pub type FilterTag = u64;
348pub type Mail = (Option<FilterTag>, PipelineData);
349
350impl IgnoredMail {
351    pub fn add(&mut self, (tag, value): Mail) {
352        let id = self.next_id;
353        self.next_id += 1;
354
355        self.messages.insert(id, (tag, value));
356
357        if let Some(tag) = tag {
358            self.by_tag.entry(tag).or_default().insert(id);
359        }
360    }
361
362    pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
363        if let Some(tag) = tag {
364            self.pop_oldest_with_tag(tag)
365        } else {
366            self.pop_oldest()
367        }
368    }
369
370    pub fn clear(&mut self) {
371        self.messages.clear();
372        self.by_tag.clear();
373    }
374
375    fn pop_oldest(&mut self) -> Option<PipelineData> {
376        let (id, (tag, value)) = self.messages.pop_first()?;
377
378        if let Some(tag) = tag {
379            let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
380                ids.remove(&id);
381                ids.is_empty()
382            } else {
383                false
384            };
385
386            if needs_cleanup {
387                self.by_tag.remove(&tag);
388            }
389        }
390
391        Some(value)
392    }
393
394    fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
395        let ids = self.by_tag.get_mut(&tag)?;
396
397        let id = ids.pop_first()?;
398
399        if ids.is_empty() {
400            self.by_tag.remove(&tag);
401        }
402
403        Some(self.messages.remove(&id)?.1)
404    }
405}