nu_command/experimental/
job_spawn.rs1use std::{
2 sync::{
3 Arc, Mutex,
4 atomic::{AtomicBool, AtomicU32},
5 mpsc,
6 },
7 thread,
8};
9
10use nu_engine::{ClosureEvalOnce, command_prelude::*};
11use nu_protocol::{
12 OutDest, Signals,
13 engine::{Closure, CurrentJob, Job, Mailbox, Redirection, ThreadJob},
14 report_shell_error,
15};
16
17#[derive(Clone)]
18pub struct JobSpawn;
19
20impl Command for JobSpawn {
21 fn name(&self) -> &str {
22 "job spawn"
23 }
24
25 fn description(&self) -> &str {
26 "Spawn a background job and retrieve its ID."
27 }
28
29 fn signature(&self) -> nu_protocol::Signature {
30 Signature::build("job spawn")
31 .category(Category::Experimental)
32 .input_output_types(vec![(Type::Nothing, Type::Int)])
33 .named(
34 "tag",
35 SyntaxShape::String,
36 "An optional description tag for this job",
37 Some('t'),
38 )
39 .required(
40 "closure",
41 SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
42 "The closure to run in another thread.",
43 )
44 }
45
46 fn search_terms(&self) -> Vec<&str> {
47 vec!["background", "bg", "&"]
48 }
49
50 fn run(
51 &self,
52 engine_state: &EngineState,
53 stack: &mut Stack,
54 call: &Call,
55 _input: PipelineData,
56 ) -> Result<PipelineData, ShellError> {
57 let head = call.head;
58
59 let closure: Closure = call.req(engine_state, stack, 0)?;
60
61 let tag: Option<String> = call.get_flag(engine_state, stack, "tag")?;
62 let job_stack = stack.clone();
63
64 let mut job_state = engine_state.clone();
65 job_state.is_interactive = false;
66
67 let job_signals = Signals::new(Arc::new(AtomicBool::new(false)));
69 job_state.set_signals(job_signals.clone());
70
71 job_state.pipeline_externals_state = Arc::new((AtomicU32::new(0), AtomicU32::new(0)));
73
74 job_state.exit_warning_given = Arc::new(AtomicBool::new(false));
75
76 let jobs = job_state.jobs.clone();
77 let mut jobs = jobs.lock().expect("jobs lock is poisoned!");
78
79 let (send, recv) = mpsc::channel();
80
81 let id = {
82 let thread_job = ThreadJob::new(job_signals, tag, send);
83
84 let id = jobs.add_job(Job::Thread(thread_job.clone()));
85
86 job_state.current_job = CurrentJob {
87 id,
88 background_thread_job: Some(thread_job),
89 mailbox: Arc::new(Mutex::new(Mailbox::new(recv))),
90 };
91
92 id
93 };
94
95 let result = thread::Builder::new()
96 .name(format!("background job {}", id.get()))
97 .spawn(move || {
98 let mut stack = job_stack.reset_pipes();
99 let stack = stack.push_redirection(
100 Some(Redirection::Pipe(OutDest::Null)),
101 Some(Redirection::Pipe(OutDest::Null)),
102 );
103 ClosureEvalOnce::new_preserve_out_dest(&job_state, &stack, closure)
104 .run_with_input(Value::nothing(head).into_pipeline_data())
105 .and_then(|data| data.drain())
106 .unwrap_or_else(|err| {
107 if !job_state.signals().interrupted() {
108 report_shell_error(&job_state, &err);
109 }
110 });
111
112 {
113 let mut jobs = job_state.jobs.lock().expect("jobs lock is poisoned!");
114
115 jobs.remove_job(id);
116 }
117 });
118
119 match result {
120 Ok(_) => Ok(Value::int(id.get() as i64, head).into_pipeline_data()),
121 Err(err) => {
122 jobs.remove_job(id);
123 Err(ShellError::Io(IoError::new_with_additional_context(
124 err,
125 call.head,
126 None,
127 "Failed to spawn thread for job",
128 )))
129 }
130 }
131 }
132
133 fn examples(&self) -> Vec<Example<'_>> {
134 vec![Example {
135 example: "job spawn { sleep 5sec; rm evidence.pdf }",
136 description: "Spawn a background job to do some time consuming work",
137 result: None,
138 }]
139 }
140
141 fn extra_description(&self) -> &str {
142 r#"Executes the provided closure in a background thread
143and registers this task in the background job table, which can be retrieved with `job list`.
144
145This command returns the job id of the newly created job.
146 "#
147 }
148}