nu_protocol/engine/
jobs.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{Arc, Mutex},
4};
5
6use nu_system::{kill_by_pid, UnfreezeHandle};
7
8use crate::Signals;
9
10use crate::JobId;
11
12pub struct Jobs {
13    next_job_id: usize,
14
15    // this is the ID of the most recently added frozen job in the jobs table.
16    // the methods of this struct must ensure the invariant of this always
17    // being None or pointing to a valid job in the table
18    last_frozen_job_id: Option<JobId>,
19    jobs: HashMap<JobId, Job>,
20}
21
22impl Default for Jobs {
23    fn default() -> Self {
24        Self {
25            next_job_id: 1,
26            last_frozen_job_id: None,
27            jobs: HashMap::default(),
28        }
29    }
30}
31
32impl Jobs {
33    pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
34        self.jobs.iter().map(|(k, v)| (*k, v))
35    }
36
37    pub fn lookup(&self, id: JobId) -> Option<&Job> {
38        self.jobs.get(&id)
39    }
40
41    pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
42        if self.last_frozen_job_id.is_some_and(|last| id == last) {
43            self.last_frozen_job_id = None;
44        }
45
46        self.jobs.remove(&id)
47    }
48
49    fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
50        if let Job::Frozen(_) = job {
51            self.last_frozen_job_id = Some(id);
52        }
53    }
54
55    pub fn add_job(&mut self, job: Job) -> JobId {
56        let this_id = JobId::new(self.next_job_id);
57
58        self.assign_last_frozen_id_if_frozen(this_id, &job);
59
60        self.jobs.insert(this_id, job);
61        self.next_job_id += 1;
62
63        this_id
64    }
65
66    pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
67        self.last_frozen_job_id
68    }
69
70    // this is useful when you want to remove a job from the list and add it back later
71    pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
72        self.assign_last_frozen_id_if_frozen(id, &job);
73
74        if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
75            e.insert(job);
76            Ok(())
77        } else {
78            Err("job already exists")
79        }
80    }
81
82    /// This function tries to forcefully kill a job from this job table,
83    /// removes it from the job table. It always succeeds in removing the job
84    /// from the table, but may fail in killing the job's active processes.
85    pub fn kill_and_remove(&mut self, id: JobId) -> std::io::Result<()> {
86        if let Some(job) = self.jobs.get(&id) {
87            let err = job.kill();
88
89            self.remove_job(id);
90
91            err?
92        }
93
94        Ok(())
95    }
96
97    /// This function tries to forcefully kill all the background jobs and
98    /// removes all of them from the job table.
99    ///
100    /// It returns an error if any of the job killing attempts fails, but always
101    /// succeeds in removing the jobs from the table.
102    pub fn kill_all(&mut self) -> std::io::Result<()> {
103        self.last_frozen_job_id = None;
104
105        self.jobs.clear();
106
107        let first_err = self
108            .iter()
109            .map(|(_, job)| job.kill().err())
110            .fold(None, |acc, x| acc.or(x));
111
112        if let Some(err) = first_err {
113            Err(err)
114        } else {
115            Ok(())
116        }
117    }
118}
119
120pub enum Job {
121    Thread(ThreadJob),
122    Frozen(FrozenJob),
123}
124
125// A thread job represents a job that is currently executing as a background thread in nushell.
126// This is an Arc-y type, cloning it does not uniquely clone the information of this particular
127// job.
128
129// Although rust's documentation does not document the acquire-release semantics of Mutex, this
130// is a direct undocumentented requirement of its soundness, and is thus assumed by this
131// implementaation.
132// see issue https://github.com/rust-lang/rust/issues/126239.
133#[derive(Clone)]
134pub struct ThreadJob {
135    signals: Signals,
136    pids: Arc<Mutex<HashSet<u32>>>,
137}
138
139impl ThreadJob {
140    pub fn new(signals: Signals) -> Self {
141        ThreadJob {
142            signals,
143            pids: Arc::new(Mutex::new(HashSet::default())),
144        }
145    }
146
147    /// Tries to add the provided pid to the active pid set of the current job.
148    ///
149    /// Returns true if the pid was added successfully, or false if the
150    /// current job is interrupted.
151    pub fn try_add_pid(&self, pid: u32) -> bool {
152        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
153
154        // note: this signals check must occur after the pids lock has been locked.
155        if self.signals.interrupted() {
156            false
157        } else {
158            pids.insert(pid);
159            true
160        }
161    }
162
163    pub fn collect_pids(&self) -> Vec<u32> {
164        let lock = self.pids.lock().expect("PID lock was poisoned");
165
166        lock.iter().copied().collect()
167    }
168
169    pub fn kill(&self) -> std::io::Result<()> {
170        // it's okay to make this interrupt outside of the mutex, since it has acquire-release
171        // semantics.
172
173        self.signals.trigger();
174
175        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
176
177        for pid in pids.iter() {
178            kill_by_pid((*pid).into())?;
179        }
180
181        pids.clear();
182
183        Ok(())
184    }
185
186    pub fn remove_pid(&self, pid: u32) {
187        let mut pids = self.pids.lock().expect("PID lock was poisoned");
188
189        pids.remove(&pid);
190    }
191}
192
193impl Job {
194    pub fn kill(&self) -> std::io::Result<()> {
195        match self {
196            Job::Thread(thread_job) => thread_job.kill(),
197            Job::Frozen(frozen_job) => frozen_job.kill(),
198        }
199    }
200}
201
202pub struct FrozenJob {
203    pub unfreeze: UnfreezeHandle,
204}
205
206impl FrozenJob {
207    pub fn kill(&self) -> std::io::Result<()> {
208        #[cfg(unix)]
209        {
210            kill_by_pid(self.unfreeze.pid() as i64)
211        }
212
213        // it doesn't happen outside unix.
214        #[cfg(not(unix))]
215        {
216            Ok(())
217        }
218    }
219}