nu_command/experimental/
job_spawn.rs

1use std::{
2    sync::{
3        Arc, Mutex,
4        atomic::{AtomicBool, AtomicU32},
5        mpsc,
6    },
7    thread,
8};
9
10use nu_engine::{ClosureEvalOnce, command_prelude::*};
11use nu_protocol::{
12    OutDest, Signals,
13    engine::{Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob},
14    report_shell_error,
15};
16
17#[derive(Clone)]
18pub struct JobSpawn;
19
20impl Command for JobSpawn {
21    fn name(&self) -> &str {
22        "job spawn"
23    }
24
25    fn description(&self) -> &str {
26        "Spawn a background job and retrieve its ID."
27    }
28
29    fn signature(&self) -> nu_protocol::Signature {
30        Signature::build("job spawn")
31            .category(Category::Experimental)
32            .input_output_types(vec![(Type::Nothing, Type::Int)])
33            .named(
34                "tag",
35                SyntaxShape::String,
36                "An optional description tag for this job",
37                Some('t'),
38            )
39            .required(
40                "closure",
41                SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
42                "The closure to run in another thread.",
43            )
44    }
45
46    fn search_terms(&self) -> Vec<&str> {
47        vec!["background", "bg", "&"]
48    }
49
50    fn run(
51        &self,
52        engine_state: &EngineState,
53        stack: &mut Stack,
54        call: &Call,
55        _input: PipelineData,
56    ) -> Result<PipelineData, ShellError> {
57        let head = call.head;
58
59        let closure: Closure = call.req(engine_state, stack, 0)?;
60
61        let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
62        let job_stack = stack.clone();
63
64        let mut job_state = engine_state.clone();
65        job_state.is_interactive = false;
66
67        // the new job should have its ctrl-c independent of foreground
68        let job_signals = Signals::new(Arc::new(AtomicBool::new(false)));
69        job_state.set_signals(job_signals.clone());
70
71        // the new job has a separate process group state for its processes
72        job_state.pipeline_externals_state = Arc::new((AtomicU32::new(0), AtomicU32::new(0)));
73
74        job_state.exit_warning_given = Arc::new(AtomicBool::new(false));
75
76        let jobs = job_state.jobs.clone();
77        let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
78
79        let (send, recv) = mpsc::channel();
80
81        let id = {
82            let thread_job = ThreadJob::new(job_signals, tag, send);
83
84            let id = jobs.add_job(Job::Thread(thread_job.clone()));
85
86            job_state.current_job = CurrentJob {
87                id,
88                background_thread_job: Some(thread_job),
89                mailbox: Arc::new(Mutex::new(Mailbox::new(recv))),
90            };
91
92            id
93        };
94
95        let result = thread::Builder::new()
96            .name(format!("background job {}", id.get()))
97            .spawn(move || {
98                let mut stack = job_stack.reset_pipes();
99                let stack = stack.push_redirection(
100                    Some(Redirection::Pipe(OutDest::Null)),
101                    Some(Redirection::Pipe(OutDest::Null)),
102                );
103                ClosureEvalOnce::new_preserve_out_dest(&job_state, &stack, closure)
104                    .run_with_input(Value::nothing(head).into_pipeline_data())
105                    .and_then(|data| data.drain())
106                    .unwrap_or_else(|err| {
107                        if !job_state.signals().interrupted() {
108                            report_shell_error(&job_state, &err);
109                        }
110                    });
111
112                {
113                    let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");
114
115                    jobs.remove_job(id);
116                }
117            });
118
119        match result {
120            Ok(_) => Ok(Value::int(id.get() as i64, head).into_pipeline_data()),
121            Err(err) => {
122                jobs.remove_job(id);
123                Err(ShellError::Io(IoError::new_with_additional_context(
124                    err,
125                    call.head,
126                    None,
127                    "Failed to spawn thread for job",
128                )))
129            }
130        }
131    }
132
133    fn examples(&self) -> Vec<Example<'_>> {
134        vec![Example {
135            example: "job spawn { sleep 5sec; rm evidence.pdf }",
136            description: "Spawn a background job to do some time consuming work",
137            result: None,
138        }]
139    }
140
141    fn extra_description(&self) -> &str {
142        r#"Executes the provided closure in a background thread
143and registers this task in the background job table, which can be retrieved with `job list`.
144
145This command returns the job id of the newly created job.
146            "#
147    }
148}