1use std::collections::VecDeque;
4use std::fmt::Display;
5
6use futures::FutureExt;
7
8use crate::ExecutionResult;
9use crate::error;
10use crate::processes;
11use crate::sys;
12use crate::trace_categories;
13use crate::traps;
14
15pub(crate) type JobJoinHandle = tokio::task::JoinHandle<Result<ExecutionResult, error::Error>>;
16pub(crate) type JobResult = (Job, Result<ExecutionResult, error::Error>);
17
18#[derive(Default)]
20pub struct JobManager {
21 pub jobs: Vec<Job>,
23}
24
25pub enum JobTask {
27 External(processes::ChildProcess),
29 Internal(JobJoinHandle),
31}
32
33pub enum JobTaskWaitResult {
35 Completed(ExecutionResult),
37 Stopped,
39}
40
41impl JobTask {
42 pub async fn wait(&mut self) -> Result<JobTaskWaitResult, error::Error> {
44 match self {
45 Self::External(process) => {
46 let wait_result = process.wait().await?;
47 match wait_result {
48 processes::ProcessWaitResult::Completed(output) => {
49 Ok(JobTaskWaitResult::Completed(output.into()))
50 }
51 processes::ProcessWaitResult::Stopped => Ok(JobTaskWaitResult::Stopped),
52 }
53 }
54 Self::Internal(handle) => Ok(JobTaskWaitResult::Completed(handle.await??)),
55 }
56 }
57
58 #[allow(clippy::unwrap_in_result)]
59 fn poll(&mut self) -> Option<Result<ExecutionResult, error::Error>> {
60 match self {
61 Self::External(process) => {
62 let check_result = process.poll();
63 check_result.map(|polled_result| polled_result.map(|output| output.into()))
64 }
65 Self::Internal(handle) => {
66 let checkable_handle = handle;
67 checkable_handle.now_or_never().map(|r| r.unwrap())
68 }
69 }
70 }
71}
72
73impl JobManager {
74 pub fn new() -> Self {
76 Self::default()
77 }
78
79 pub fn add_as_current(&mut self, mut job: Job) -> &Job {
86 for j in &mut self.jobs {
87 if matches!(j.annotation, JobAnnotation::Current) {
88 j.annotation = JobAnnotation::Previous;
89 break;
90 }
91 }
92
93 let id = self.jobs.len() + 1;
94 job.id = id;
95 job.annotation = JobAnnotation::Current;
96 self.jobs.push(job);
97 self.jobs.last().unwrap()
98 }
99
100 pub fn current_job(&self) -> Option<&Job> {
102 self.jobs
103 .iter()
104 .find(|j| matches!(j.annotation, JobAnnotation::Current))
105 }
106
107 pub fn current_job_mut(&mut self) -> Option<&mut Job> {
109 self.jobs
110 .iter_mut()
111 .find(|j| matches!(j.annotation, JobAnnotation::Current))
112 }
113
114 pub fn prev_job(&self) -> Option<&Job> {
116 self.jobs
117 .iter()
118 .find(|j| matches!(j.annotation, JobAnnotation::Previous))
119 }
120
121 pub fn prev_job_mut(&mut self) -> Option<&mut Job> {
123 self.jobs
124 .iter_mut()
125 .find(|j| matches!(j.annotation, JobAnnotation::Previous))
126 }
127
128 pub fn resolve_job_spec(&mut self, job_spec: &str) -> Option<&mut Job> {
134 let remainder = job_spec.strip_prefix('%')?;
135
136 match remainder {
137 "%" | "+" => self.current_job_mut(),
138 "-" => self.prev_job_mut(),
139 s if s.chars().all(char::is_numeric) => {
140 let id = s.parse::<usize>().ok()?;
141 self.jobs.iter_mut().find(|j| j.id == id)
142 }
143 _ => {
144 tracing::warn!(target: trace_categories::UNIMPLEMENTED, "unimplemented: job spec naming command: '{job_spec}'");
145 None
146 }
147 }
148 }
149
150 pub async fn wait_all(&mut self) -> Result<Vec<Job>, error::Error> {
152 for job in &mut self.jobs {
153 job.wait().await?;
154 }
155
156 Ok(self.sweep_completed_jobs())
157 }
158
159 pub fn poll(&mut self) -> Result<Vec<JobResult>, error::Error> {
161 let mut results = vec![];
162
163 let mut i = 0;
164 while i != self.jobs.len() {
165 if let Some(result) = self.jobs[i].poll_done()? {
166 let job = self.jobs.remove(i);
167 results.push((job, result));
168 } else if matches!(self.jobs[i].state, JobState::Done) {
169 results.push((self.jobs.remove(i), Ok(ExecutionResult::success())));
172 } else {
173 i += 1;
174 }
175 }
176
177 Ok(results)
178 }
179
180 fn sweep_completed_jobs(&mut self) -> Vec<Job> {
181 let mut completed_jobs = vec![];
182
183 let mut i = 0;
184 while i != self.jobs.len() {
185 if self.jobs[i].tasks.is_empty() {
186 completed_jobs.push(self.jobs.remove(i));
187 } else {
188 i += 1;
189 }
190 }
191
192 completed_jobs
193 }
194}
195
196#[derive(Clone)]
198pub enum JobState {
199 Unknown,
201 Running,
203 Stopped,
205 Done,
207}
208
209impl Display for JobState {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 match self {
212 Self::Unknown => write!(f, "Unknown"),
213 Self::Running => write!(f, "Running"),
214 Self::Stopped => write!(f, "Stopped"),
215 Self::Done => write!(f, "Done"),
216 }
217 }
218}
219
220#[derive(Clone)]
222pub enum JobAnnotation {
223 None,
225 Current,
227 Previous,
229}
230
231impl Display for JobAnnotation {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 match self {
234 Self::None => write!(f, ""),
235 Self::Current => write!(f, "+"),
236 Self::Previous => write!(f, "-"),
237 }
238 }
239}
240
241pub struct Job {
243 tasks: VecDeque<JobTask>,
245
246 pgid: Option<sys::process::ProcessId>,
248
249 annotation: JobAnnotation,
251
252 pub id: usize,
254
255 pub command_line: String,
257
258 pub state: JobState,
260}
261
262impl Display for Job {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 write!(
265 f,
266 "[{}]{:3}{}\t{}",
267 self.id,
268 self.annotation.to_string(),
269 self.state,
270 self.command_line
271 )
272 }
273}
274
275impl Job {
276 pub(crate) fn new<I>(tasks: I, command_line: String, state: JobState) -> Self
284 where
285 I: IntoIterator<Item = JobTask>,
286 {
287 Self {
288 id: 0,
289 tasks: tasks.into_iter().collect(),
290 pgid: None,
291 annotation: JobAnnotation::None,
292 command_line,
293 state,
294 }
295 }
296
297 pub fn to_pid_style_string(&self) -> String {
299 let display_pid = self
300 .representative_pid()
301 .map_or_else(|| String::from("<pid unknown>"), |pid| pid.to_string());
302 std::format!("[{}]{}\t{}", self.id, self.annotation, display_pid)
303 }
304
305 pub fn annotation(&self) -> JobAnnotation {
307 self.annotation.clone()
308 }
309
310 pub fn command_name(&self) -> &str {
312 self.command_line
313 .split_ascii_whitespace()
314 .next()
315 .unwrap_or_default()
316 }
317
318 pub const fn is_current(&self) -> bool {
320 matches!(self.annotation, JobAnnotation::Current)
321 }
322
323 pub const fn is_prev(&self) -> bool {
325 matches!(self.annotation, JobAnnotation::Previous)
326 }
327
328 pub fn poll_done(
330 &mut self,
331 ) -> Result<Option<Result<ExecutionResult, error::Error>>, error::Error> {
332 let mut result: Option<Result<ExecutionResult, error::Error>> = None;
333
334 tracing::debug!(target: trace_categories::JOBS, "Polling job {} for completion...", self.id);
335
336 while !self.tasks.is_empty() {
337 let task = &mut self.tasks[0];
338 match task.poll() {
339 Some(r) => {
340 self.tasks.remove(0);
341 result = Some(r);
342 }
343 None => {
344 return Ok(None);
345 }
346 }
347 }
348
349 tracing::debug!(target: trace_categories::JOBS, "Job {} has completed.", self.id);
350
351 self.state = JobState::Done;
352
353 Ok(result)
354 }
355
356 pub async fn wait(&mut self) -> Result<ExecutionResult, error::Error> {
358 let mut result = ExecutionResult::success();
359
360 while let Some(task) = self.tasks.back_mut() {
361 match task.wait().await? {
362 JobTaskWaitResult::Completed(execution_result) => {
363 result = execution_result;
364 self.tasks.pop_back();
365 }
366 JobTaskWaitResult::Stopped => {
367 self.state = JobState::Stopped;
368 return Ok(ExecutionResult::stopped());
369 }
370 }
371 }
372
373 self.state = JobState::Done;
374
375 Ok(result)
376 }
377
378 pub fn move_to_background(&mut self) -> Result<(), error::Error> {
380 if matches!(self.state, JobState::Stopped) {
381 if let Some(pgid) = self.process_group_id() {
382 sys::signal::continue_process(pgid)?;
383 self.state = JobState::Running;
384 Ok(())
385 } else {
386 Err(error::ErrorKind::FailedToSendSignal.into())
387 }
388 } else {
389 error::unimp("move job to background")
390 }
391 }
392
393 pub fn move_to_foreground(&mut self) -> Result<(), error::Error> {
395 if matches!(self.state, JobState::Stopped) {
396 if let Some(pgid) = self.process_group_id() {
397 sys::signal::continue_process(pgid)?;
398 self.state = JobState::Running;
399 } else {
400 return Err(error::ErrorKind::FailedToSendSignal.into());
401 }
402 }
403
404 if let Some(pgid) = self.process_group_id() {
405 sys::terminal::move_to_foreground(pgid)?;
406 }
407
408 Ok(())
409 }
410
411 pub fn kill(&self, signal: traps::TrapSignal) -> Result<(), error::Error> {
417 if let Some(pid) = self.process_group_id() {
418 sys::signal::kill_process(pid, signal)
419 } else {
420 Err(error::ErrorKind::FailedToSendSignal.into())
421 }
422 }
423
424 pub fn representative_pid(&self) -> Option<sys::process::ProcessId> {
426 for task in &self.tasks {
427 match task {
428 JobTask::External(p) => {
429 if let Some(pid) = p.pid() {
430 return Some(pid);
431 }
432 }
433 JobTask::Internal(_) => (),
434 }
435 }
436 None
437 }
438
439 pub fn process_group_id(&self) -> Option<sys::process::ProcessId> {
441 self.pgid.or_else(|| self.representative_pid())
443 }
444}