Skip to main content

nu_protocol/engine/
jobs.rs

1use std::{
2    collections::{BTreeMap, BTreeSet, HashMap, HashSet, hash_map::Entry},
3    sync::{
4        Arc, Mutex,
5        mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6    },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use nu_utils::time::Instant;
11#[cfg(not(target_family = "wasm"))]
12use std::time::Duration;
13
14use nu_system::{UnfreezeHandle, kill_by_pid};
15
16use crate::{PipelineData, Signals, shell_error};
17
18use crate::JobId;
19
20#[derive(Debug)]
21pub struct Jobs {
22    next_job_id: usize,
23
24    // this is the ID of the most recently added frozen job in the jobs table.
25    // the methods of this struct must ensure the invariant of this always
26    // being None or pointing to a valid job in the table
27    last_frozen_job_id: Option<JobId>,
28    jobs: HashMap<JobId, Job>,
29}
30
31impl Default for Jobs {
32    fn default() -> Self {
33        Self {
34            next_job_id: 1,
35            last_frozen_job_id: None,
36            jobs: HashMap::default(),
37        }
38    }
39}
40
41impl Jobs {
42    pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
43        self.jobs.iter().map(|(k, v)| (*k, v))
44    }
45
46    pub fn lookup(&self, id: JobId) -> Option<&Job> {
47        self.jobs.get(&id)
48    }
49
50    pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
51        self.jobs.get_mut(&id)
52    }
53
54    pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
55        if self.last_frozen_job_id.is_some_and(|last| id == last) {
56            self.last_frozen_job_id = None;
57        }
58
59        self.jobs.remove(&id)
60    }
61
62    fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
63        if let Job::Frozen(_) = job {
64            self.last_frozen_job_id = Some(id);
65        }
66    }
67
68    pub fn add_job(&mut self, job: Job) -> JobId {
69        let this_id = JobId::new(self.next_job_id);
70
71        self.assign_last_frozen_id_if_frozen(this_id, &job);
72
73        self.jobs.insert(this_id, job);
74        self.next_job_id += 1;
75
76        this_id
77    }
78
79    pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
80        self.last_frozen_job_id
81    }
82
83    // this is useful when you want to remove a job from the list and add it back later
84    pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
85        self.assign_last_frozen_id_if_frozen(id, &job);
86
87        if let Entry::Vacant(e) = self.jobs.entry(id) {
88            e.insert(job);
89            Ok(())
90        } else {
91            Err("job already exists")
92        }
93    }
94
95    /// This function tries to forcefully kill a job from this job table,
96    /// removes it from the job table. It always succeeds in removing the job
97    /// from the table, but may fail in killing the job's active processes.
98    pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
99        if let Some(job) = self.jobs.get(&id) {
100            let err = job.kill();
101
102            self.remove_job(id);
103
104            err?
105        }
106
107        Ok(())
108    }
109
110    /// This function tries to forcefully kill all the background jobs and
111    /// removes all of them from the job table.
112    ///
113    /// It returns an error if any of the job killing attempts fails, but always
114    /// succeeds in removing the jobs from the table.
115    pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
116        self.last_frozen_job_id = None;
117
118        let first_err = self
119            .iter()
120            .map(|(_, job)| job.kill().err())
121            .fold(None, |acc, x| acc.or(x));
122
123        self.jobs.clear();
124
125        if let Some(err) = first_err {
126            Err(err)
127        } else {
128            Ok(())
129        }
130    }
131}
132
133#[derive(Debug)]
134pub enum Job {
135    Thread(ThreadJob),
136    Frozen(FrozenJob),
137}
138
139// A thread job represents a job that is currently executing as a background thread in nushell.
140// This is an Arc-y type, cloning it does not uniquely clone the information of this particular
141// job.
142
143// Although rust's documentation does not document the acquire-release semantics of Mutex, this
144// is a direct undocumentented requirement of its soundness, and is thus assumed by this
145// implementaation.
146// see issue https://github.com/rust-lang/rust/issues/126239.
147#[derive(Clone, Debug)]
148pub struct ThreadJob {
149    signals: Signals,
150    pids: Arc<Mutex<HashSet<u32>>>,
151    description: Option<String>,
152    pub sender: Sender<Mail>,
153}
154
155impl ThreadJob {
156    pub fn new(signals: Signals, description: Option<String>, sender: Sender<Mail>) -> Self {
157        ThreadJob {
158            signals,
159            pids: Arc::new(Mutex::new(HashSet::default())),
160            sender,
161            description,
162        }
163    }
164
165    /// Tries to add the provided pid to the active pid set of the current job.
166    ///
167    /// Returns true if the pid was added successfully, or false if the
168    /// current job is interrupted.
169    pub fn try_add_pid(&self, pid: u32) -> bool {
170        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
171
172        // note: this signals check must occur after the pids lock has been locked.
173        if self.signals.interrupted() {
174            false
175        } else {
176            pids.insert(pid);
177            true
178        }
179    }
180
181    pub fn collect_pids(&self) -> Vec<u32> {
182        let lock = self.pids.lock().expect("PID lock was poisoned");
183
184        lock.iter().copied().collect()
185    }
186
187    pub fn kill(&self) -> shell_error::io::Result<()> {
188        // it's okay to make this interrupt outside of the mutex, since it has acquire-release
189        // semantics.
190
191        self.signals.trigger();
192
193        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
194
195        for pid in pids.iter() {
196            kill_by_pid((*pid).into())?;
197        }
198
199        pids.clear();
200
201        Ok(())
202    }
203
204    pub fn remove_pid(&self, pid: u32) {
205        let mut pids = self.pids.lock().expect("PID lock was poisoned");
206
207        pids.remove(&pid);
208    }
209}
210
211impl Job {
212    pub fn kill(&self) -> shell_error::io::Result<()> {
213        match self {
214            Job::Thread(thread_job) => thread_job.kill(),
215            Job::Frozen(frozen_job) => frozen_job.kill(),
216        }
217    }
218
219    pub fn description(&self) -> Option<&String> {
220        match self {
221            Job::Thread(thread_job) => thread_job.description.as_ref(),
222            Job::Frozen(frozen_job) => frozen_job.description.as_ref(),
223        }
224    }
225
226    pub fn assign_description(&mut self, description: Option<String>) {
227        match self {
228            Job::Thread(thread_job) => thread_job.description = description,
229            Job::Frozen(frozen_job) => frozen_job.description = description,
230        }
231    }
232}
233
234#[derive(Debug)]
235pub struct FrozenJob {
236    pub unfreeze: UnfreezeHandle,
237    pub description: Option<String>,
238}
239
240impl FrozenJob {
241    pub fn kill(&self) -> shell_error::io::Result<()> {
242        #[cfg(unix)]
243        {
244            Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
245        }
246
247        // it doesn't happen outside unix.
248        #[cfg(not(unix))]
249        {
250            Ok(())
251        }
252    }
253}
254
255/// Stores the information about the background job currently being executed by this thread, if any
256#[derive(Clone, Debug)]
257pub struct CurrentJob {
258    pub id: JobId,
259
260    // The background thread job associated with this thread.
261    // If None, it indicates this thread is currently the main job
262    pub background_thread_job: Option<ThreadJob>,
263
264    // note: although the mailbox is Mutex'd, it is only ever accessed
265    // by the current job's threads
266    pub mailbox: Arc<Mutex<Mailbox>>,
267}
268
269// The storage for unread messages
270//
271// Messages are initially sent over a mpsc channel,
272// and may then be stored in a IgnoredMail struct when
273// filtered out by a message tag.
274#[derive(Debug)]
275pub struct Mailbox {
276    receiver: Receiver<Mail>,
277    ignored_mail: IgnoredMail,
278}
279
280impl Mailbox {
281    pub fn new(receiver: Receiver<Mail>) -> Self {
282        Mailbox {
283            receiver,
284            ignored_mail: IgnoredMail::default(),
285        }
286    }
287
288    #[cfg(not(target_family = "wasm"))]
289    pub fn recv_timeout(
290        &mut self,
291        filter_tag: Option<FilterTag>,
292        timeout: Duration,
293    ) -> Result<PipelineData, RecvTimeoutError> {
294        if let Some(value) = self.ignored_mail.pop(filter_tag) {
295            Ok(value)
296        } else {
297            let mut waited_so_far = Duration::ZERO;
298            let mut before = Instant::now();
299
300            while waited_so_far < timeout {
301                let (tag, value) = self
302                    .receiver
303                    .recv_timeout(timeout.checked_sub(waited_so_far).unwrap_or(Duration::ZERO))?;
304
305                if filter_tag.is_none() || filter_tag == tag {
306                    return Ok(value);
307                } else {
308                    self.ignored_mail.add((tag, value));
309                    let now = Instant::now();
310                    waited_so_far += now - before;
311                    before = now;
312                }
313            }
314
315            Err(RecvTimeoutError::Timeout)
316        }
317    }
318
319    #[cfg(not(target_family = "wasm"))]
320    pub fn try_recv(
321        &mut self,
322        filter_tag: Option<FilterTag>,
323    ) -> Result<PipelineData, TryRecvError> {
324        if let Some(value) = self.ignored_mail.pop(filter_tag) {
325            Ok(value)
326        } else {
327            loop {
328                let (tag, value) = self.receiver.try_recv()?;
329
330                if filter_tag.is_none() || filter_tag == tag {
331                    return Ok(value);
332                } else {
333                    self.ignored_mail.add((tag, value));
334                }
335            }
336        }
337    }
338
339    pub fn clear(&mut self) {
340        self.ignored_mail.clear();
341
342        while self.receiver.try_recv().is_ok() {}
343    }
344}
345
346// A data structure used to store messages which were received, but currently ignored by a tag filter
347// messages are added and popped in a first-in-first-out matter.
348#[derive(Default, Debug)]
349struct IgnoredMail {
350    next_id: usize,
351    messages: BTreeMap<usize, Mail>,
352    by_tag: HashMap<FilterTag, BTreeSet<usize>>,
353}
354
355pub type FilterTag = u64;
356pub type Mail = (Option<FilterTag>, PipelineData);
357
358impl IgnoredMail {
359    pub fn add(&mut self, (tag, value): Mail) {
360        let id = self.next_id;
361        self.next_id += 1;
362
363        self.messages.insert(id, (tag, value));
364
365        if let Some(tag) = tag {
366            self.by_tag.entry(tag).or_default().insert(id);
367        }
368    }
369
370    pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
371        if let Some(tag) = tag {
372            self.pop_oldest_with_tag(tag)
373        } else {
374            self.pop_oldest()
375        }
376    }
377
378    pub fn clear(&mut self) {
379        self.messages.clear();
380        self.by_tag.clear();
381    }
382
383    fn pop_oldest(&mut self) -> Option<PipelineData> {
384        let (id, (tag, value)) = self.messages.pop_first()?;
385
386        if let Some(tag) = tag
387            && let Entry::Occupied(mut occupied_entry) = self.by_tag.entry(tag)
388        {
389            occupied_entry.get_mut().remove(&id);
390            if occupied_entry.get().is_empty() {
391                occupied_entry.remove();
392            }
393        }
394
395        Some(value)
396    }
397
398    fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
399        let ids = self.by_tag.get_mut(&tag)?;
400
401        let id = ids.pop_first()?;
402
403        if ids.is_empty() {
404            self.by_tag.remove(&tag);
405        }
406
407        Some(self.messages.remove(&id)?.1)
408    }
409}