Skip to main content

nu_protocol/engine/
jobs.rs

1use std::{
2    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3    sync::{
4        Arc, Mutex,
5        mpsc::{Receiver, RecvTimeoutError, Sender, TryRecvError},
6    },
7};
8
9#[cfg(not(target_family = "wasm"))]
10use nu_utils::time::Instant;
11#[cfg(not(target_family = "wasm"))]
12use std::time::Duration;
13
14use nu_system::{UnfreezeHandle, kill_by_pid};
15
16use crate::{PipelineData, Signals, shell_error};
17
18use crate::JobId;
19
20pub struct Jobs {
21    next_job_id: usize,
22
23    // this is the ID of the most recently added frozen job in the jobs table.
24    // the methods of this struct must ensure the invariant of this always
25    // being None or pointing to a valid job in the table
26    last_frozen_job_id: Option<JobId>,
27    jobs: HashMap<JobId, Job>,
28}
29
30impl Default for Jobs {
31    fn default() -> Self {
32        Self {
33            next_job_id: 1,
34            last_frozen_job_id: None,
35            jobs: HashMap::default(),
36        }
37    }
38}
39
40impl Jobs {
41    pub fn iter(&self) -> impl Iterator<Item = (JobId, &Job)> {
42        self.jobs.iter().map(|(k, v)| (*k, v))
43    }
44
45    pub fn lookup(&self, id: JobId) -> Option<&Job> {
46        self.jobs.get(&id)
47    }
48
49    pub fn lookup_mut(&mut self, id: JobId) -> Option<&mut Job> {
50        self.jobs.get_mut(&id)
51    }
52
53    pub fn remove_job(&mut self, id: JobId) -> Option<Job> {
54        if self.last_frozen_job_id.is_some_and(|last| id == last) {
55            self.last_frozen_job_id = None;
56        }
57
58        self.jobs.remove(&id)
59    }
60
61    fn assign_last_frozen_id_if_frozen(&mut self, id: JobId, job: &Job) {
62        if let Job::Frozen(_) = job {
63            self.last_frozen_job_id = Some(id);
64        }
65    }
66
67    pub fn add_job(&mut self, job: Job) -> JobId {
68        let this_id = JobId::new(self.next_job_id);
69
70        self.assign_last_frozen_id_if_frozen(this_id, &job);
71
72        self.jobs.insert(this_id, job);
73        self.next_job_id += 1;
74
75        this_id
76    }
77
78    pub fn most_recent_frozen_job_id(&mut self) -> Option<JobId> {
79        self.last_frozen_job_id
80    }
81
82    // this is useful when you want to remove a job from the list and add it back later
83    pub fn add_job_with_id(&mut self, id: JobId, job: Job) -> Result<(), &'static str> {
84        self.assign_last_frozen_id_if_frozen(id, &job);
85
86        if let std::collections::hash_map::Entry::Vacant(e) = self.jobs.entry(id) {
87            e.insert(job);
88            Ok(())
89        } else {
90            Err("job already exists")
91        }
92    }
93
94    /// This function tries to forcefully kill a job from this job table,
95    /// removes it from the job table. It always succeeds in removing the job
96    /// from the table, but may fail in killing the job's active processes.
97    pub fn kill_and_remove(&mut self, id: JobId) -> shell_error::io::Result<()> {
98        if let Some(job) = self.jobs.get(&id) {
99            let err = job.kill();
100
101            self.remove_job(id);
102
103            err?
104        }
105
106        Ok(())
107    }
108
109    /// This function tries to forcefully kill all the background jobs and
110    /// removes all of them from the job table.
111    ///
112    /// It returns an error if any of the job killing attempts fails, but always
113    /// succeeds in removing the jobs from the table.
114    pub fn kill_all(&mut self) -> shell_error::io::Result<()> {
115        self.last_frozen_job_id = None;
116
117        let first_err = self
118            .iter()
119            .map(|(_, job)| job.kill().err())
120            .fold(None, |acc, x| acc.or(x));
121
122        self.jobs.clear();
123
124        if let Some(err) = first_err {
125            Err(err)
126        } else {
127            Ok(())
128        }
129    }
130}
131
132pub enum Job {
133    Thread(ThreadJob),
134    Frozen(FrozenJob),
135}
136
137// A thread job represents a job that is currently executing as a background thread in nushell.
138// This is an Arc-y type, cloning it does not uniquely clone the information of this particular
139// job.
140
141// Although rust's documentation does not document the acquire-release semantics of Mutex, this
142// is a direct undocumentented requirement of its soundness, and is thus assumed by this
143// implementaation.
144// see issue https://github.com/rust-lang/rust/issues/126239.
145#[derive(Clone)]
146pub struct ThreadJob {
147    signals: Signals,
148    pids: Arc<Mutex<HashSet<u32>>>,
149    description: Option<String>,
150    pub sender: Sender<Mail>,
151}
152
153impl ThreadJob {
154    pub fn new(signals: Signals, description: Option<String>, sender: Sender<Mail>) -> Self {
155        ThreadJob {
156            signals,
157            pids: Arc::new(Mutex::new(HashSet::default())),
158            sender,
159            description,
160        }
161    }
162
163    /// Tries to add the provided pid to the active pid set of the current job.
164    ///
165    /// Returns true if the pid was added successfully, or false if the
166    /// current job is interrupted.
167    pub fn try_add_pid(&self, pid: u32) -> bool {
168        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
169
170        // note: this signals check must occur after the pids lock has been locked.
171        if self.signals.interrupted() {
172            false
173        } else {
174            pids.insert(pid);
175            true
176        }
177    }
178
179    pub fn collect_pids(&self) -> Vec<u32> {
180        let lock = self.pids.lock().expect("PID lock was poisoned");
181
182        lock.iter().copied().collect()
183    }
184
185    pub fn kill(&self) -> shell_error::io::Result<()> {
186        // it's okay to make this interrupt outside of the mutex, since it has acquire-release
187        // semantics.
188
189        self.signals.trigger();
190
191        let mut pids = self.pids.lock().expect("PIDs lock was poisoned");
192
193        for pid in pids.iter() {
194            kill_by_pid((*pid).into())?;
195        }
196
197        pids.clear();
198
199        Ok(())
200    }
201
202    pub fn remove_pid(&self, pid: u32) {
203        let mut pids = self.pids.lock().expect("PID lock was poisoned");
204
205        pids.remove(&pid);
206    }
207}
208
209impl Job {
210    pub fn kill(&self) -> shell_error::io::Result<()> {
211        match self {
212            Job::Thread(thread_job) => thread_job.kill(),
213            Job::Frozen(frozen_job) => frozen_job.kill(),
214        }
215    }
216
217    pub fn description(&self) -> Option<&String> {
218        match self {
219            Job::Thread(thread_job) => thread_job.description.as_ref(),
220            Job::Frozen(frozen_job) => frozen_job.description.as_ref(),
221        }
222    }
223
224    pub fn assign_description(&mut self, description: Option<String>) {
225        match self {
226            Job::Thread(thread_job) => thread_job.description = description,
227            Job::Frozen(frozen_job) => frozen_job.description = description,
228        }
229    }
230}
231
232pub struct FrozenJob {
233    pub unfreeze: UnfreezeHandle,
234    pub description: Option<String>,
235}
236
237impl FrozenJob {
238    pub fn kill(&self) -> shell_error::io::Result<()> {
239        #[cfg(unix)]
240        {
241            Ok(kill_by_pid(self.unfreeze.pid() as i64)?)
242        }
243
244        // it doesn't happen outside unix.
245        #[cfg(not(unix))]
246        {
247            Ok(())
248        }
249    }
250}
251
252/// Stores the information about the background job currently being executed by this thread, if any
253#[derive(Clone)]
254pub struct CurrentJob {
255    pub id: JobId,
256
257    // The background thread job associated with this thread.
258    // If None, it indicates this thread is currently the main job
259    pub background_thread_job: Option<ThreadJob>,
260
261    // note: although the mailbox is Mutex'd, it is only ever accessed
262    // by the current job's threads
263    pub mailbox: Arc<Mutex<Mailbox>>,
264}
265
266// The storage for unread messages
267//
268// Messages are initially sent over a mpsc channel,
269// and may then be stored in a IgnoredMail struct when
270// filtered out by a message tag.
271pub struct Mailbox {
272    receiver: Receiver<Mail>,
273    ignored_mail: IgnoredMail,
274}
275
276impl Mailbox {
277    pub fn new(receiver: Receiver<Mail>) -> Self {
278        Mailbox {
279            receiver,
280            ignored_mail: IgnoredMail::default(),
281        }
282    }
283
284    #[cfg(not(target_family = "wasm"))]
285    pub fn recv_timeout(
286        &mut self,
287        filter_tag: Option<FilterTag>,
288        timeout: Duration,
289    ) -> Result<PipelineData, RecvTimeoutError> {
290        if let Some(value) = self.ignored_mail.pop(filter_tag) {
291            Ok(value)
292        } else {
293            let mut waited_so_far = Duration::ZERO;
294            let mut before = Instant::now();
295
296            while waited_so_far < timeout {
297                let (tag, value) = self
298                    .receiver
299                    .recv_timeout(timeout.checked_sub(waited_so_far).unwrap_or(Duration::ZERO))?;
300
301                if filter_tag.is_none() || filter_tag == tag {
302                    return Ok(value);
303                } else {
304                    self.ignored_mail.add((tag, value));
305                    let now = Instant::now();
306                    waited_so_far += now - before;
307                    before = now;
308                }
309            }
310
311            Err(RecvTimeoutError::Timeout)
312        }
313    }
314
315    #[cfg(not(target_family = "wasm"))]
316    pub fn try_recv(
317        &mut self,
318        filter_tag: Option<FilterTag>,
319    ) -> Result<PipelineData, TryRecvError> {
320        if let Some(value) = self.ignored_mail.pop(filter_tag) {
321            Ok(value)
322        } else {
323            loop {
324                let (tag, value) = self.receiver.try_recv()?;
325
326                if filter_tag.is_none() || filter_tag == tag {
327                    return Ok(value);
328                } else {
329                    self.ignored_mail.add((tag, value));
330                }
331            }
332        }
333    }
334
335    pub fn clear(&mut self) {
336        self.ignored_mail.clear();
337
338        while self.receiver.try_recv().is_ok() {}
339    }
340}
341
342// A data structure used to store messages which were received, but currently ignored by a tag filter
343// messages are added and popped in a first-in-first-out matter.
344#[derive(Default)]
345struct IgnoredMail {
346    next_id: usize,
347    messages: BTreeMap<usize, Mail>,
348    by_tag: HashMap<FilterTag, BTreeSet<usize>>,
349}
350
351pub type FilterTag = u64;
352pub type Mail = (Option<FilterTag>, PipelineData);
353
354impl IgnoredMail {
355    pub fn add(&mut self, (tag, value): Mail) {
356        let id = self.next_id;
357        self.next_id += 1;
358
359        self.messages.insert(id, (tag, value));
360
361        if let Some(tag) = tag {
362            self.by_tag.entry(tag).or_default().insert(id);
363        }
364    }
365
366    pub fn pop(&mut self, tag: Option<FilterTag>) -> Option<PipelineData> {
367        if let Some(tag) = tag {
368            self.pop_oldest_with_tag(tag)
369        } else {
370            self.pop_oldest()
371        }
372    }
373
374    pub fn clear(&mut self) {
375        self.messages.clear();
376        self.by_tag.clear();
377    }
378
379    fn pop_oldest(&mut self) -> Option<PipelineData> {
380        let (id, (tag, value)) = self.messages.pop_first()?;
381
382        if let Some(tag) = tag {
383            let needs_cleanup = if let Some(ids) = self.by_tag.get_mut(&tag) {
384                ids.remove(&id);
385                ids.is_empty()
386            } else {
387                false
388            };
389
390            if needs_cleanup {
391                self.by_tag.remove(&tag);
392            }
393        }
394
395        Some(value)
396    }
397
398    fn pop_oldest_with_tag(&mut self, tag: FilterTag) -> Option<PipelineData> {
399        let ids = self.by_tag.get_mut(&tag)?;
400
401        let id = ids.pop_first()?;
402
403        if ids.is_empty() {
404            self.by_tag.remove(&tag);
405        }
406
407        Some(self.messages.remove(&id)?.1)
408    }
409}