1use std::collections::VecDeque;
4use std::fmt::Display;
5
6use futures::FutureExt;
7
8use crate::ExecutionResult;
9use crate::error;
10use crate::processes;
11use crate::sys;
12use crate::trace_categories;
13use crate::traps;
14
15pub(crate) type JobJoinHandle = tokio::task::JoinHandle<Result<ExecutionResult, error::Error>>;
16pub(crate) type JobResult = (Job, Result<ExecutionResult, error::Error>);
17
18#[derive(Default)]
20pub struct JobManager {
21 pub jobs: Vec<Job>,
23}
24
25pub enum JobTask {
27 External(processes::ChildProcess),
29 Internal(JobJoinHandle),
31}
32
33pub enum JobTaskWaitResult {
35 Completed(ExecutionResult),
37 Stopped,
39}
40
41impl JobTask {
42 pub const fn is_external(&self) -> bool {
44 matches!(self, Self::External(_))
45 }
46
47 pub async fn wait(&mut self) -> Result<JobTaskWaitResult, error::Error> {
49 match self {
50 Self::External(process) => {
51 let wait_result = process.wait().await?;
52 match wait_result {
53 processes::ProcessWaitResult::Completed(output) => {
54 Ok(JobTaskWaitResult::Completed(output.into()))
55 }
56 processes::ProcessWaitResult::Stopped => Ok(JobTaskWaitResult::Stopped),
57 }
58 }
59 Self::Internal(handle) => Ok(JobTaskWaitResult::Completed(handle.await??)),
60 }
61 }
62
63 fn poll(&mut self) -> Option<Result<ExecutionResult, error::Error>> {
68 match self {
69 Self::External(process) => {
70 let check_result = process.poll();
71 check_result.map(|polled_result| polled_result.map(|output| output.into()))
72 }
73 Self::Internal(handle) => {
74 let checkable_handle = handle;
75 checkable_handle.now_or_never().and_then(|r| r.ok())
76 }
77 }
78 }
79}
80
81impl JobManager {
82 pub fn new() -> Self {
84 Self::default()
85 }
86
87 #[allow(
94 clippy::missing_panics_doc,
95 reason = "push() guarantees the vector length is >= 1"
96 )]
97 pub fn add_as_current(&mut self, mut job: Job) -> &Job {
98 for j in &mut self.jobs {
99 if matches!(j.annotation, JobAnnotation::Current) {
100 j.annotation = JobAnnotation::Previous;
101 break;
102 }
103 }
104
105 let id = self.jobs.len() + 1;
106 job.id = id;
107 job.annotation = JobAnnotation::Current;
108 self.jobs.push(job);
109
110 #[allow(clippy::unwrap_used, reason = "we just pushed an element")]
111 self.jobs.last().unwrap()
112 }
113
114 pub fn current_job(&self) -> Option<&Job> {
116 self.jobs
117 .iter()
118 .find(|j| matches!(j.annotation, JobAnnotation::Current))
119 }
120
121 pub fn current_job_mut(&mut self) -> Option<&mut Job> {
123 self.jobs
124 .iter_mut()
125 .find(|j| matches!(j.annotation, JobAnnotation::Current))
126 }
127
128 pub fn prev_job(&self) -> Option<&Job> {
130 self.jobs
131 .iter()
132 .find(|j| matches!(j.annotation, JobAnnotation::Previous))
133 }
134
135 pub fn prev_job_mut(&mut self) -> Option<&mut Job> {
137 self.jobs
138 .iter_mut()
139 .find(|j| matches!(j.annotation, JobAnnotation::Previous))
140 }
141
142 pub fn resolve_job_spec(&mut self, job_spec: &str) -> Option<&mut Job> {
148 let remainder = job_spec.strip_prefix('%')?;
149
150 match remainder {
151 "%" | "+" => self.current_job_mut(),
152 "-" => self.prev_job_mut(),
153 s if s.chars().all(char::is_numeric) => {
154 let id = s.parse::<usize>().ok()?;
155 self.jobs.iter_mut().find(|j| j.id == id)
156 }
157 _ => {
158 tracing::warn!(target: trace_categories::UNIMPLEMENTED, "unimplemented: job spec naming command: '{job_spec}'");
159 None
160 }
161 }
162 }
163
164 pub async fn wait_all(&mut self) -> Result<Vec<Job>, error::Error> {
166 for job in &mut self.jobs {
167 job.wait().await?;
168 }
169
170 Ok(self.sweep_completed_jobs())
171 }
172
173 pub fn poll(&mut self) -> Result<Vec<JobResult>, error::Error> {
175 let mut results = vec![];
176
177 let mut i = 0;
178 while i != self.jobs.len() {
179 if let Some(result) = self.jobs[i].poll_done()? {
180 let job = self.jobs.remove(i);
181 results.push((job, result));
182 } else if matches!(self.jobs[i].state, JobState::Done) {
183 results.push((self.jobs.remove(i), Ok(ExecutionResult::success())));
186 } else {
187 i += 1;
188 }
189 }
190
191 Ok(results)
192 }
193
194 fn sweep_completed_jobs(&mut self) -> Vec<Job> {
195 let mut completed_jobs = vec![];
196
197 let mut i = 0;
198 while i != self.jobs.len() {
199 if self.jobs[i].tasks.is_empty() {
200 completed_jobs.push(self.jobs.remove(i));
201 } else {
202 i += 1;
203 }
204 }
205
206 completed_jobs
207 }
208}
209
210#[derive(Clone)]
212pub enum JobState {
213 Unknown,
215 Running,
217 Stopped,
219 Done,
221}
222
223impl Display for JobState {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 match self {
226 Self::Unknown => write!(f, "Unknown"),
227 Self::Running => write!(f, "Running"),
228 Self::Stopped => write!(f, "Stopped"),
229 Self::Done => write!(f, "Done"),
230 }
231 }
232}
233
234#[derive(Clone)]
236pub enum JobAnnotation {
237 None,
239 Current,
241 Previous,
243}
244
245impl Display for JobAnnotation {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 match self {
248 Self::None => write!(f, ""),
249 Self::Current => write!(f, "+"),
250 Self::Previous => write!(f, "-"),
251 }
252 }
253}
254
255pub struct Job {
257 tasks: VecDeque<JobTask>,
259
260 pgid: Option<sys::process::ProcessId>,
262
263 annotation: JobAnnotation,
265
266 pub id: usize,
268
269 pub command_line: String,
271
272 pub state: JobState,
274}
275
276impl Display for Job {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 write!(
279 f,
280 "[{}]{:3}{}\t{}",
281 self.id,
282 self.annotation.to_string(),
283 self.state,
284 self.command_line
285 )
286 }
287}
288
289impl Job {
290 pub(crate) fn new<I>(tasks: I, command_line: String, state: JobState) -> Self
298 where
299 I: IntoIterator<Item = JobTask>,
300 {
301 Self {
302 id: 0,
303 tasks: tasks.into_iter().collect(),
304 pgid: None,
305 annotation: JobAnnotation::None,
306 command_line,
307 state,
308 }
309 }
310
311 pub fn to_pid_style_string(&self) -> String {
313 let display_pid = self
314 .representative_pid()
315 .map_or_else(|| String::from("<pid unknown>"), |pid| pid.to_string());
316 std::format!("[{}]{}\t{}", self.id, self.annotation, display_pid)
317 }
318
319 pub fn annotation(&self) -> JobAnnotation {
321 self.annotation.clone()
322 }
323
324 pub fn command_name(&self) -> &str {
326 self.command_line
327 .split_ascii_whitespace()
328 .next()
329 .unwrap_or_default()
330 }
331
332 pub const fn is_current(&self) -> bool {
334 matches!(self.annotation, JobAnnotation::Current)
335 }
336
337 pub const fn is_prev(&self) -> bool {
339 matches!(self.annotation, JobAnnotation::Previous)
340 }
341
342 pub fn poll_done(
344 &mut self,
345 ) -> Result<Option<Result<ExecutionResult, error::Error>>, error::Error> {
346 let mut result: Option<Result<ExecutionResult, error::Error>> = None;
347
348 tracing::debug!(target: trace_categories::JOBS, "Polling job {} for completion...", self.id);
349
350 while !self.tasks.is_empty() {
351 let task = &mut self.tasks[0];
352 match task.poll() {
353 Some(r) => {
354 self.tasks.remove(0);
355 result = Some(r);
356 }
357 None => {
358 return Ok(None);
359 }
360 }
361 }
362
363 tracing::debug!(target: trace_categories::JOBS, "Job {} has completed.", self.id);
364
365 self.state = JobState::Done;
366
367 Ok(result)
368 }
369
370 pub async fn wait(&mut self) -> Result<ExecutionResult, error::Error> {
372 let mut result = ExecutionResult::success();
373
374 while let Some(task) = self.tasks.back_mut() {
375 match task.wait().await? {
376 JobTaskWaitResult::Completed(execution_result) => {
377 result = execution_result;
378 self.tasks.pop_back();
379 }
380 JobTaskWaitResult::Stopped => {
381 self.state = JobState::Stopped;
382 return Ok(ExecutionResult::stopped());
383 }
384 }
385 }
386
387 self.state = JobState::Done;
388
389 Ok(result)
390 }
391
392 pub fn move_to_background(&mut self) -> Result<(), error::Error> {
394 if matches!(self.state, JobState::Stopped) {
395 if let Some(pgid) = self.process_group_id() {
396 sys::signal::continue_process(pgid)?;
397 self.state = JobState::Running;
398 Ok(())
399 } else {
400 Err(error::ErrorKind::FailedToSendSignal.into())
401 }
402 } else {
403 error::unimp("move job to background")
404 }
405 }
406
407 pub fn move_to_foreground(&mut self) -> Result<(), error::Error> {
409 if matches!(self.state, JobState::Stopped) {
410 if let Some(pgid) = self.process_group_id() {
411 sys::signal::continue_process(pgid)?;
412 self.state = JobState::Running;
413 } else {
414 return Err(error::ErrorKind::FailedToSendSignal.into());
415 }
416 }
417
418 if let Some(pgid) = self.process_group_id() {
419 sys::terminal::move_to_foreground(pgid)?;
420 }
421
422 Ok(())
423 }
424
425 pub fn kill(&self, signal: traps::TrapSignal) -> Result<(), error::Error> {
431 if let Some(pid) = self.process_group_id() {
432 sys::signal::kill_process(pid, signal)
433 } else {
434 Err(error::ErrorKind::FailedToSendSignal.into())
435 }
436 }
437
438 pub fn representative_pid(&self) -> Option<sys::process::ProcessId> {
440 for task in &self.tasks {
441 match task {
442 JobTask::External(p) => {
443 if let Some(pid) = p.pid() {
444 return Some(pid);
445 }
446 }
447 JobTask::Internal(_) => (),
448 }
449 }
450 None
451 }
452
453 pub fn process_group_id(&self) -> Option<sys::process::ProcessId> {
455 self.pgid.or_else(|| self.representative_pid())
457 }
458}