brush_core/
jobs.rs

1//! Job management
2
3use std::collections::VecDeque;
4use std::fmt::Display;
5
6use futures::FutureExt;
7
8use crate::ExecutionResult;
9use crate::error;
10use crate::processes;
11use crate::sys;
12use crate::trace_categories;
13use crate::traps;
14
15pub(crate) type JobJoinHandle = tokio::task::JoinHandle<Result<ExecutionResult, error::Error>>;
16pub(crate) type JobResult = (Job, Result<ExecutionResult, error::Error>);
17
18/// Manages the jobs that are currently managed by the shell.
19#[derive(Default)]
20pub struct JobManager {
21    /// The jobs that are currently managed by the shell.
22    pub jobs: Vec<Job>,
23}
24
25/// Represents a task that is part of a job.
26pub enum JobTask {
27    /// An external process.
28    External(processes::ChildProcess),
29    /// An internal asynchronous task.
30    Internal(JobJoinHandle),
31}
32
33/// Represents the result of waiting on a job task.
34pub enum JobTaskWaitResult {
35    /// The task has completed.
36    Completed(ExecutionResult),
37    /// The task was stopped.
38    Stopped,
39}
40
41impl JobTask {
42    /// Waits for the task to complete. Returns the result of the wait.
43    pub async fn wait(&mut self) -> Result<JobTaskWaitResult, error::Error> {
44        match self {
45            Self::External(process) => {
46                let wait_result = process.wait().await?;
47                match wait_result {
48                    processes::ProcessWaitResult::Completed(output) => {
49                        Ok(JobTaskWaitResult::Completed(output.into()))
50                    }
51                    processes::ProcessWaitResult::Stopped => Ok(JobTaskWaitResult::Stopped),
52                }
53            }
54            Self::Internal(handle) => Ok(JobTaskWaitResult::Completed(handle.await??)),
55        }
56    }
57
58    #[allow(clippy::unwrap_in_result)]
59    fn poll(&mut self) -> Option<Result<ExecutionResult, error::Error>> {
60        match self {
61            Self::External(process) => {
62                let check_result = process.poll();
63                check_result.map(|polled_result| polled_result.map(|output| output.into()))
64            }
65            Self::Internal(handle) => {
66                let checkable_handle = handle;
67                checkable_handle.now_or_never().map(|r| r.unwrap())
68            }
69        }
70    }
71}
72
73impl JobManager {
74    /// Returns a new job manager.
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Adds a job to the job manager and marks it as the current job;
80    /// returns an immutable reference to the job.
81    ///
82    /// # Arguments
83    ///
84    /// * `job` - The job to add.
85    pub fn add_as_current(&mut self, mut job: Job) -> &Job {
86        for j in &mut self.jobs {
87            if matches!(j.annotation, JobAnnotation::Current) {
88                j.annotation = JobAnnotation::Previous;
89                break;
90            }
91        }
92
93        let id = self.jobs.len() + 1;
94        job.id = id;
95        job.annotation = JobAnnotation::Current;
96        self.jobs.push(job);
97        self.jobs.last().unwrap()
98    }
99
100    /// Returns the current job, if there is one.
101    pub fn current_job(&self) -> Option<&Job> {
102        self.jobs
103            .iter()
104            .find(|j| matches!(j.annotation, JobAnnotation::Current))
105    }
106
107    /// Returns a mutable reference to the current job, if there is one.
108    pub fn current_job_mut(&mut self) -> Option<&mut Job> {
109        self.jobs
110            .iter_mut()
111            .find(|j| matches!(j.annotation, JobAnnotation::Current))
112    }
113
114    /// Returns the previous job, if there is one.
115    pub fn prev_job(&self) -> Option<&Job> {
116        self.jobs
117            .iter()
118            .find(|j| matches!(j.annotation, JobAnnotation::Previous))
119    }
120
121    /// Returns a mutable reference to the previous job, if there is one.
122    pub fn prev_job_mut(&mut self) -> Option<&mut Job> {
123        self.jobs
124            .iter_mut()
125            .find(|j| matches!(j.annotation, JobAnnotation::Previous))
126    }
127
128    /// Tries to resolve the given job specification to a job.
129    ///
130    /// # Arguments
131    ///
132    /// * `job_spec` - The job specification to resolve.
133    pub fn resolve_job_spec(&mut self, job_spec: &str) -> Option<&mut Job> {
134        let remainder = job_spec.strip_prefix('%')?;
135
136        match remainder {
137            "%" | "+" => self.current_job_mut(),
138            "-" => self.prev_job_mut(),
139            s if s.chars().all(char::is_numeric) => {
140                let id = s.parse::<usize>().ok()?;
141                self.jobs.iter_mut().find(|j| j.id == id)
142            }
143            _ => {
144                tracing::warn!(target: trace_categories::UNIMPLEMENTED, "unimplemented: job spec naming command: '{job_spec}'");
145                None
146            }
147        }
148    }
149
150    /// Waits for all managed jobs to complete.
151    pub async fn wait_all(&mut self) -> Result<Vec<Job>, error::Error> {
152        for job in &mut self.jobs {
153            job.wait().await?;
154        }
155
156        Ok(self.sweep_completed_jobs())
157    }
158
159    /// Polls all managed jobs for completion.
160    pub fn poll(&mut self) -> Result<Vec<JobResult>, error::Error> {
161        let mut results = vec![];
162
163        let mut i = 0;
164        while i != self.jobs.len() {
165            if let Some(result) = self.jobs[i].poll_done()? {
166                let job = self.jobs.remove(i);
167                results.push((job, result));
168            } else if matches!(self.jobs[i].state, JobState::Done) {
169                // TODO: This is a workaround to remove jobs that are done but for which we don't
170                // know what happened.
171                results.push((self.jobs.remove(i), Ok(ExecutionResult::success())));
172            } else {
173                i += 1;
174            }
175        }
176
177        Ok(results)
178    }
179
180    fn sweep_completed_jobs(&mut self) -> Vec<Job> {
181        let mut completed_jobs = vec![];
182
183        let mut i = 0;
184        while i != self.jobs.len() {
185            if self.jobs[i].tasks.is_empty() {
186                completed_jobs.push(self.jobs.remove(i));
187            } else {
188                i += 1;
189            }
190        }
191
192        completed_jobs
193    }
194}
195
196/// Represents the current execution state of a job.
197#[derive(Clone)]
198pub enum JobState {
199    /// Unknown state.
200    Unknown,
201    /// The job is running.
202    Running,
203    /// The job is stopped.
204    Stopped,
205    /// The job has completed.
206    Done,
207}
208
209impl Display for JobState {
210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211        match self {
212            Self::Unknown => write!(f, "Unknown"),
213            Self::Running => write!(f, "Running"),
214            Self::Stopped => write!(f, "Stopped"),
215            Self::Done => write!(f, "Done"),
216        }
217    }
218}
219
220/// Represents an annotation for a job.
221#[derive(Clone)]
222pub enum JobAnnotation {
223    /// No annotation.
224    None,
225    /// The job is the current job.
226    Current,
227    /// The job is the previous job.
228    Previous,
229}
230
231impl Display for JobAnnotation {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        match self {
234            Self::None => write!(f, ""),
235            Self::Current => write!(f, "+"),
236            Self::Previous => write!(f, "-"),
237        }
238    }
239}
240
241/// Encapsulates a set of processes managed by the shell as a single unit.
242pub struct Job {
243    /// The tasks that make up the job.
244    tasks: VecDeque<JobTask>,
245
246    /// If available, the process group ID of the job's processes.
247    pgid: Option<sys::process::ProcessId>,
248
249    /// The annotation of the job (e.g., current, previous).
250    annotation: JobAnnotation,
251
252    /// The shell-internal ID of the job.
253    pub id: usize,
254
255    /// The command line of the job.
256    pub command_line: String,
257
258    /// The current operational state of the job.
259    pub state: JobState,
260}
261
262impl Display for Job {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        write!(
265            f,
266            "[{}]{:3}{}\t{}",
267            self.id,
268            self.annotation.to_string(),
269            self.state,
270            self.command_line
271        )
272    }
273}
274
275impl Job {
276    /// Returns a new job object.
277    ///
278    /// # Arguments
279    ///
280    /// * `children` - The job's known child processes.
281    /// * `command_line` - The command line of the job.
282    /// * `state` - The current operational state of the job.
283    pub(crate) fn new<I>(tasks: I, command_line: String, state: JobState) -> Self
284    where
285        I: IntoIterator<Item = JobTask>,
286    {
287        Self {
288            id: 0,
289            tasks: tasks.into_iter().collect(),
290            pgid: None,
291            annotation: JobAnnotation::None,
292            command_line,
293            state,
294        }
295    }
296
297    /// Returns a pid-style string for the job.
298    pub fn to_pid_style_string(&self) -> String {
299        let display_pid = self
300            .representative_pid()
301            .map_or_else(|| String::from("<pid unknown>"), |pid| pid.to_string());
302        std::format!("[{}]{}\t{}", self.id, self.annotation, display_pid)
303    }
304
305    /// Returns the annotation of the job.
306    pub fn annotation(&self) -> JobAnnotation {
307        self.annotation.clone()
308    }
309
310    /// Returns the command name of the job.
311    pub fn command_name(&self) -> &str {
312        self.command_line
313            .split_ascii_whitespace()
314            .next()
315            .unwrap_or_default()
316    }
317
318    /// Returns whether the job is the current job.
319    pub const fn is_current(&self) -> bool {
320        matches!(self.annotation, JobAnnotation::Current)
321    }
322
323    /// Returns whether the job is the previous job.
324    pub const fn is_prev(&self) -> bool {
325        matches!(self.annotation, JobAnnotation::Previous)
326    }
327
328    /// Polls whether the job has completed.
329    pub fn poll_done(
330        &mut self,
331    ) -> Result<Option<Result<ExecutionResult, error::Error>>, error::Error> {
332        let mut result: Option<Result<ExecutionResult, error::Error>> = None;
333
334        tracing::debug!(target: trace_categories::JOBS, "Polling job {} for completion...", self.id);
335
336        while !self.tasks.is_empty() {
337            let task = &mut self.tasks[0];
338            match task.poll() {
339                Some(r) => {
340                    self.tasks.remove(0);
341                    result = Some(r);
342                }
343                None => {
344                    return Ok(None);
345                }
346            }
347        }
348
349        tracing::debug!(target: trace_categories::JOBS, "Job {} has completed.", self.id);
350
351        self.state = JobState::Done;
352
353        Ok(result)
354    }
355
356    /// Waits for the job to complete.
357    pub async fn wait(&mut self) -> Result<ExecutionResult, error::Error> {
358        let mut result = ExecutionResult::success();
359
360        while let Some(task) = self.tasks.back_mut() {
361            match task.wait().await? {
362                JobTaskWaitResult::Completed(execution_result) => {
363                    result = execution_result;
364                    self.tasks.pop_back();
365                }
366                JobTaskWaitResult::Stopped => {
367                    self.state = JobState::Stopped;
368                    return Ok(ExecutionResult::stopped());
369                }
370            }
371        }
372
373        self.state = JobState::Done;
374
375        Ok(result)
376    }
377
378    /// Moves the job to execute in the background.
379    pub fn move_to_background(&mut self) -> Result<(), error::Error> {
380        if matches!(self.state, JobState::Stopped) {
381            if let Some(pgid) = self.process_group_id() {
382                sys::signal::continue_process(pgid)?;
383                self.state = JobState::Running;
384                Ok(())
385            } else {
386                Err(error::ErrorKind::FailedToSendSignal.into())
387            }
388        } else {
389            error::unimp("move job to background")
390        }
391    }
392
393    /// Moves the job to execute in the foreground.
394    pub fn move_to_foreground(&mut self) -> Result<(), error::Error> {
395        if matches!(self.state, JobState::Stopped) {
396            if let Some(pgid) = self.process_group_id() {
397                sys::signal::continue_process(pgid)?;
398                self.state = JobState::Running;
399            } else {
400                return Err(error::ErrorKind::FailedToSendSignal.into());
401            }
402        }
403
404        if let Some(pgid) = self.process_group_id() {
405            sys::terminal::move_to_foreground(pgid)?;
406        }
407
408        Ok(())
409    }
410
411    /// Kills the job.
412    ///
413    /// # Arguments
414    ///
415    /// * `signal` - The signal to send to the job.
416    pub fn kill(&self, signal: traps::TrapSignal) -> Result<(), error::Error> {
417        if let Some(pid) = self.process_group_id() {
418            sys::signal::kill_process(pid, signal)
419        } else {
420            Err(error::ErrorKind::FailedToSendSignal.into())
421        }
422    }
423
424    /// Tries to retrieve a "representative" pid for the job.
425    pub fn representative_pid(&self) -> Option<sys::process::ProcessId> {
426        for task in &self.tasks {
427            match task {
428                JobTask::External(p) => {
429                    if let Some(pid) = p.pid() {
430                        return Some(pid);
431                    }
432                }
433                JobTask::Internal(_) => (),
434            }
435        }
436        None
437    }
438
439    /// Tries to retrieve the process group ID (PGID) of the job.
440    pub fn process_group_id(&self) -> Option<sys::process::ProcessId> {
441        // TODO: Don't assume that the first PID is the PGID.
442        self.pgid.or_else(|| self.representative_pid())
443    }
444}