Skip to main content

brush_core/
jobs.rs

1//! Job management
2
3use std::collections::VecDeque;
4use std::fmt::Display;
5
6use futures::FutureExt;
7
8use crate::ExecutionResult;
9use crate::error;
10use crate::processes;
11use crate::sys;
12use crate::trace_categories;
13use crate::traps;
14
15pub(crate) type JobJoinHandle = tokio::task::JoinHandle<Result<ExecutionResult, error::Error>>;
16pub(crate) type JobResult = (Job, Result<ExecutionResult, error::Error>);
17
18/// Manages the jobs that are currently managed by the shell.
19#[derive(Default)]
20pub struct JobManager {
21    /// The jobs that are currently managed by the shell.
22    pub jobs: Vec<Job>,
23}
24
25/// Represents a task that is part of a job.
26pub enum JobTask {
27    /// An external process.
28    External(processes::ChildProcess),
29    /// An internal asynchronous task.
30    Internal(JobJoinHandle),
31}
32
33/// Represents the result of waiting on a job task.
34pub enum JobTaskWaitResult {
35    /// The task has completed.
36    Completed(ExecutionResult),
37    /// The task was stopped.
38    Stopped,
39}
40
41impl JobTask {
42    /// Returns whether the task is an external process.
43    pub const fn is_external(&self) -> bool {
44        matches!(self, Self::External(_))
45    }
46
47    /// Waits for the task to complete. Returns the result of the wait.
48    pub async fn wait(&mut self) -> Result<JobTaskWaitResult, error::Error> {
49        match self {
50            Self::External(process) => {
51                let wait_result = process.wait().await?;
52                match wait_result {
53                    processes::ProcessWaitResult::Completed(output) => {
54                        Ok(JobTaskWaitResult::Completed(output.into()))
55                    }
56                    processes::ProcessWaitResult::Stopped => Ok(JobTaskWaitResult::Stopped),
57                }
58            }
59            Self::Internal(handle) => Ok(JobTaskWaitResult::Completed(handle.await??)),
60        }
61    }
62
63    /// Polls the task for completion. Returns `Some(result)` if the task has completed,
64    /// or `None` if it is still running. The result is the execution result of the task.
65    /// Behaves in a best-effort manner; if an internal error occurs during polling,
66    /// it will return `None`.
67    fn poll(&mut self) -> Option<Result<ExecutionResult, error::Error>> {
68        match self {
69            Self::External(process) => {
70                let check_result = process.poll();
71                check_result.map(|polled_result| polled_result.map(|output| output.into()))
72            }
73            Self::Internal(handle) => {
74                let checkable_handle = handle;
75                checkable_handle.now_or_never().and_then(|r| r.ok())
76            }
77        }
78    }
79}
80
81impl JobManager {
82    /// Returns a new job manager.
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Adds a job to the job manager and marks it as the current job;
88    /// returns an immutable reference to the job.
89    ///
90    /// # Arguments
91    ///
92    /// * `job` - The job to add.
93    #[allow(
94        clippy::missing_panics_doc,
95        reason = "push() guarantees the vector length is >= 1"
96    )]
97    pub fn add_as_current(&mut self, mut job: Job) -> &Job {
98        for j in &mut self.jobs {
99            if matches!(j.annotation, JobAnnotation::Current) {
100                j.annotation = JobAnnotation::Previous;
101                break;
102            }
103        }
104
105        let id = self.jobs.len() + 1;
106        job.id = id;
107        job.annotation = JobAnnotation::Current;
108        self.jobs.push(job);
109
110        #[allow(clippy::unwrap_used, reason = "we just pushed an element")]
111        self.jobs.last().unwrap()
112    }
113
114    /// Returns the current job, if there is one.
115    pub fn current_job(&self) -> Option<&Job> {
116        self.jobs
117            .iter()
118            .find(|j| matches!(j.annotation, JobAnnotation::Current))
119    }
120
121    /// Returns a mutable reference to the current job, if there is one.
122    pub fn current_job_mut(&mut self) -> Option<&mut Job> {
123        self.jobs
124            .iter_mut()
125            .find(|j| matches!(j.annotation, JobAnnotation::Current))
126    }
127
128    /// Returns the previous job, if there is one.
129    pub fn prev_job(&self) -> Option<&Job> {
130        self.jobs
131            .iter()
132            .find(|j| matches!(j.annotation, JobAnnotation::Previous))
133    }
134
135    /// Returns a mutable reference to the previous job, if there is one.
136    pub fn prev_job_mut(&mut self) -> Option<&mut Job> {
137        self.jobs
138            .iter_mut()
139            .find(|j| matches!(j.annotation, JobAnnotation::Previous))
140    }
141
142    /// Tries to resolve the given job specification to a job.
143    ///
144    /// # Arguments
145    ///
146    /// * `job_spec` - The job specification to resolve.
147    pub fn resolve_job_spec(&mut self, job_spec: &str) -> Option<&mut Job> {
148        let remainder = job_spec.strip_prefix('%')?;
149
150        match remainder {
151            "%" | "+" => self.current_job_mut(),
152            "-" => self.prev_job_mut(),
153            s if s.chars().all(char::is_numeric) => {
154                let id = s.parse::<usize>().ok()?;
155                self.jobs.iter_mut().find(|j| j.id == id)
156            }
157            _ => {
158                tracing::warn!(target: trace_categories::UNIMPLEMENTED, "unimplemented: job spec naming command: '{job_spec}'");
159                None
160            }
161        }
162    }
163
164    /// Waits for all managed jobs to complete.
165    pub async fn wait_all(&mut self) -> Result<Vec<Job>, error::Error> {
166        for job in &mut self.jobs {
167            job.wait().await?;
168        }
169
170        Ok(self.sweep_completed_jobs())
171    }
172
173    /// Polls all managed jobs for completion.
174    pub fn poll(&mut self) -> Result<Vec<JobResult>, error::Error> {
175        let mut results = vec![];
176
177        let mut i = 0;
178        while i != self.jobs.len() {
179            if let Some(result) = self.jobs[i].poll_done()? {
180                let job = self.jobs.remove(i);
181                results.push((job, result));
182            } else if matches!(self.jobs[i].state, JobState::Done) {
183                // TODO(jobs): This is a workaround to remove jobs that are done but for which we
184                // don't know what happened.
185                results.push((self.jobs.remove(i), Ok(ExecutionResult::success())));
186            } else {
187                i += 1;
188            }
189        }
190
191        Ok(results)
192    }
193
194    fn sweep_completed_jobs(&mut self) -> Vec<Job> {
195        let mut completed_jobs = vec![];
196
197        let mut i = 0;
198        while i != self.jobs.len() {
199            if self.jobs[i].tasks.is_empty() {
200                completed_jobs.push(self.jobs.remove(i));
201            } else {
202                i += 1;
203            }
204        }
205
206        completed_jobs
207    }
208}
209
210/// Represents the current execution state of a job.
211#[derive(Clone)]
212pub enum JobState {
213    /// Unknown state.
214    Unknown,
215    /// The job is running.
216    Running,
217    /// The job is stopped.
218    Stopped,
219    /// The job has completed.
220    Done,
221}
222
223impl Display for JobState {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        match self {
226            Self::Unknown => write!(f, "Unknown"),
227            Self::Running => write!(f, "Running"),
228            Self::Stopped => write!(f, "Stopped"),
229            Self::Done => write!(f, "Done"),
230        }
231    }
232}
233
234/// Represents an annotation for a job.
235#[derive(Clone)]
236pub enum JobAnnotation {
237    /// No annotation.
238    None,
239    /// The job is the current job.
240    Current,
241    /// The job is the previous job.
242    Previous,
243}
244
245impl Display for JobAnnotation {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        match self {
248            Self::None => write!(f, ""),
249            Self::Current => write!(f, "+"),
250            Self::Previous => write!(f, "-"),
251        }
252    }
253}
254
255/// Encapsulates a set of processes managed by the shell as a single unit.
256pub struct Job {
257    /// The tasks that make up the job.
258    tasks: VecDeque<JobTask>,
259
260    /// If available, the process group ID of the job's processes.
261    pgid: Option<sys::process::ProcessId>,
262
263    /// The annotation of the job (e.g., current, previous).
264    annotation: JobAnnotation,
265
266    /// The shell-internal ID of the job.
267    pub id: usize,
268
269    /// The command line of the job.
270    pub command_line: String,
271
272    /// The current operational state of the job.
273    pub state: JobState,
274}
275
276impl Display for Job {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        write!(
279            f,
280            "[{}]{:3}{}\t{}",
281            self.id,
282            self.annotation.to_string(),
283            self.state,
284            self.command_line
285        )
286    }
287}
288
289impl Job {
290    /// Returns a new job object.
291    ///
292    /// # Arguments
293    ///
294    /// * `children` - The job's known child processes.
295    /// * `command_line` - The command line of the job.
296    /// * `state` - The current operational state of the job.
297    pub(crate) fn new<I>(tasks: I, command_line: String, state: JobState) -> Self
298    where
299        I: IntoIterator<Item = JobTask>,
300    {
301        Self {
302            id: 0,
303            tasks: tasks.into_iter().collect(),
304            pgid: None,
305            annotation: JobAnnotation::None,
306            command_line,
307            state,
308        }
309    }
310
311    /// Returns a pid-style string for the job.
312    pub fn to_pid_style_string(&self) -> String {
313        let display_pid = self
314            .representative_pid()
315            .map_or_else(|| String::from("<pid unknown>"), |pid| pid.to_string());
316        std::format!("[{}]{}\t{}", self.id, self.annotation, display_pid)
317    }
318
319    /// Returns the annotation of the job.
320    pub fn annotation(&self) -> JobAnnotation {
321        self.annotation.clone()
322    }
323
324    /// Returns the command name of the job.
325    pub fn command_name(&self) -> &str {
326        self.command_line
327            .split_ascii_whitespace()
328            .next()
329            .unwrap_or_default()
330    }
331
332    /// Returns whether the job is the current job.
333    pub const fn is_current(&self) -> bool {
334        matches!(self.annotation, JobAnnotation::Current)
335    }
336
337    /// Returns whether the job is the previous job.
338    pub const fn is_prev(&self) -> bool {
339        matches!(self.annotation, JobAnnotation::Previous)
340    }
341
342    /// Polls whether the job has completed.
343    pub fn poll_done(
344        &mut self,
345    ) -> Result<Option<Result<ExecutionResult, error::Error>>, error::Error> {
346        let mut result: Option<Result<ExecutionResult, error::Error>> = None;
347
348        tracing::debug!(target: trace_categories::JOBS, "Polling job {} for completion...", self.id);
349
350        while !self.tasks.is_empty() {
351            let task = &mut self.tasks[0];
352            match task.poll() {
353                Some(r) => {
354                    self.tasks.remove(0);
355                    result = Some(r);
356                }
357                None => {
358                    return Ok(None);
359                }
360            }
361        }
362
363        tracing::debug!(target: trace_categories::JOBS, "Job {} has completed.", self.id);
364
365        self.state = JobState::Done;
366
367        Ok(result)
368    }
369
370    /// Waits for the job to complete.
371    pub async fn wait(&mut self) -> Result<ExecutionResult, error::Error> {
372        let mut result = ExecutionResult::success();
373
374        while let Some(task) = self.tasks.back_mut() {
375            match task.wait().await? {
376                JobTaskWaitResult::Completed(execution_result) => {
377                    result = execution_result;
378                    self.tasks.pop_back();
379                }
380                JobTaskWaitResult::Stopped => {
381                    self.state = JobState::Stopped;
382                    return Ok(ExecutionResult::stopped());
383                }
384            }
385        }
386
387        self.state = JobState::Done;
388
389        Ok(result)
390    }
391
392    /// Moves the job to execute in the background.
393    pub fn move_to_background(&mut self) -> Result<(), error::Error> {
394        if matches!(self.state, JobState::Stopped) {
395            if let Some(pgid) = self.process_group_id() {
396                sys::signal::continue_process(pgid)?;
397                self.state = JobState::Running;
398                Ok(())
399            } else {
400                Err(error::ErrorKind::FailedToSendSignal.into())
401            }
402        } else {
403            error::unimp("move job to background")
404        }
405    }
406
407    /// Moves the job to execute in the foreground.
408    pub fn move_to_foreground(&mut self) -> Result<(), error::Error> {
409        if matches!(self.state, JobState::Stopped) {
410            if let Some(pgid) = self.process_group_id() {
411                sys::signal::continue_process(pgid)?;
412                self.state = JobState::Running;
413            } else {
414                return Err(error::ErrorKind::FailedToSendSignal.into());
415            }
416        }
417
418        if let Some(pgid) = self.process_group_id() {
419            sys::terminal::move_to_foreground(pgid)?;
420        }
421
422        Ok(())
423    }
424
425    /// Kills the job.
426    ///
427    /// # Arguments
428    ///
429    /// * `signal` - The signal to send to the job.
430    pub fn kill(&self, signal: traps::TrapSignal) -> Result<(), error::Error> {
431        if let Some(pid) = self.process_group_id() {
432            sys::signal::kill_process(pid, signal)
433        } else {
434            Err(error::ErrorKind::FailedToSendSignal.into())
435        }
436    }
437
438    /// Tries to retrieve a "representative" pid for the job.
439    pub fn representative_pid(&self) -> Option<sys::process::ProcessId> {
440        for task in &self.tasks {
441            match task {
442                JobTask::External(p) => {
443                    if let Some(pid) = p.pid() {
444                        return Some(pid);
445                    }
446                }
447                JobTask::Internal(_) => (),
448            }
449        }
450        None
451    }
452
453    /// Tries to retrieve the process group ID (PGID) of the job.
454    pub fn process_group_id(&self) -> Option<sys::process::ProcessId> {
455        // TODO(jobs): Don't assume that the first PID is the PGID.
456        self.pgid.or_else(|| self.representative_pid())
457    }
458}