nu_command/experimental/
job_send.rs

1use nu_engine::command_prelude::*;
2use nu_protocol::{JobId, engine::FilterTag};
3
4#[derive(Clone)]
5pub struct JobSend;
6
7impl Command for JobSend {
8    fn name(&self) -> &str {
9        "job send"
10    }
11
12    fn description(&self) -> &str {
13        "Send a message to the mailbox of a job."
14    }
15
16    fn extra_description(&self) -> &str {
17        r#"
18This command sends a message to a background job, which can then read sent messages
19in a first-in-first-out fashion with `job recv`. When it does so, it may additionally specify a numeric filter tag,
20in which case it will only read messages sent with the exact same filter tag.
21In particular, the id 0 refers to the main/initial nushell thread.
22
23A message can be any nushell value, and streams are always collected before being sent.
24
25This command never blocks.
26"#
27    }
28
29    fn signature(&self) -> nu_protocol::Signature {
30        Signature::build("job send")
31            .category(Category::Experimental)
32            .required(
33                "id",
34                SyntaxShape::Int,
35                "The id of the job to send the message to.",
36            )
37            .named("tag", SyntaxShape::Int, "A tag for the message", None)
38            .input_output_types(vec![(Type::Any, Type::Nothing)])
39            .allow_variants_without_examples(true)
40    }
41
42    fn search_terms(&self) -> Vec<&str> {
43        vec![]
44    }
45
46    fn run(
47        &self,
48        engine_state: &EngineState,
49        stack: &mut Stack,
50        call: &Call,
51        input: PipelineData,
52    ) -> Result<PipelineData, ShellError> {
53        let head = call.head;
54
55        let id_arg: Spanned<usize> = call.req(engine_state, stack, 0)?;
56        let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
57
58        let id = JobId::new(id_arg.item);
59
60        if let Some(tag) = tag_arg
61            && tag.item < 0
62        {
63            return Err(ShellError::NeedsPositiveValue { span: tag.span });
64        }
65
66        let tag = tag_arg.map(|it| it.item as FilterTag);
67
68        if id == JobId::ZERO {
69            engine_state
70                .root_job_sender
71                .send((tag, input))
72                .expect("this should NEVER happen.");
73        } else {
74            let jobs = engine_state.jobs.lock().expect("failed to acquire lock");
75
76            if let Some(job) = jobs.lookup(id) {
77                match job {
78                    nu_protocol::engine::Job::Thread(thread_job) => {
79                        // it is ok to send this value while holding the lock, because
80                        // mail channels are always unbounded, so this send never blocks
81                        let _ = thread_job.sender.send((tag, input));
82                    }
83                    nu_protocol::engine::Job::Frozen(_) => {
84                        return Err(JobError::AlreadyFrozen {
85                            span: id_arg.span,
86                            id,
87                        }
88                        .into());
89                    }
90                }
91            } else {
92                return Err(JobError::NotFound {
93                    span: id_arg.span,
94                    id,
95                }
96                .into());
97            }
98        }
99
100        Ok(Value::nothing(head).into_pipeline_data())
101    }
102
103    fn examples(&self) -> Vec<Example<'_>> {
104        vec![
105            Example {
106                example: "let id = job spawn { job recv | save sent.txt }; 'hi' | job send $id",
107                description: "Send a message from the main thread to a newly-spawned job",
108                result: None,
109            },
110            Example {
111                example: "job spawn { sleep 1sec; 'hi' | job send 0 }; job recv",
112                description: "Send a message from a newly-spawned job to the main thread (which always has an ID of 0)",
113                result: None,
114            },
115        ]
116    }
117}