nu_command/experimental/
job_recv.rs1use std::{
2 sync::mpsc::{RecvTimeoutError, TryRecvError},
3 time::Duration,
4};
5
6use nu_utils::time::Instant;
7
8use nu_engine::command_prelude::*;
9
10use nu_protocol::{
11 Signals,
12 engine::{FilterTag, Mailbox},
13};
14
15#[derive(Clone)]
16pub struct JobRecv;
17
18const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
19
20impl Command for JobRecv {
21 fn name(&self) -> &str {
22 "job recv"
23 }
24
25 fn description(&self) -> &str {
26 "Read a message from a job's mailbox."
27 }
28
29 fn extra_description(&self) -> &str {
30 r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
31This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
32
33Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
34If no tag is specified, this command will accept any message.
35
36If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
37By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
38If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
39
40Note: When using par-each, only one thread at a time can utilize this command.
41In the case of two or more threads running this command, they will wait until other threads are done using it,
42in no particular order, regardless of the specified timeout parameter.
43"#
44 }
45
46 fn signature(&self) -> nu_protocol::Signature {
47 Signature::build("job recv")
48 .category(Category::Experimental)
49 .named("tag", SyntaxShape::Int, "A tag for the message.", None)
50 .named(
51 "timeout",
52 SyntaxShape::Duration,
53 "The maximum time duration to wait for.",
54 None,
55 )
56 .input_output_types(vec![(Type::Nothing, Type::Any)])
57 .allow_variants_without_examples(true)
58 }
59
60 fn search_terms(&self) -> Vec<&str> {
61 vec!["receive"]
62 }
63
64 fn run(
65 &self,
66 engine_state: &EngineState,
67 stack: &mut Stack,
68 call: &Call,
69 _input: PipelineData,
70 ) -> Result<PipelineData, ShellError> {
71 let head = call.head;
72
73 let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
74
75 if let Some(tag) = tag_arg
76 && tag.item < 0
77 {
78 return Err(ShellError::NeedsPositiveValue { span: tag.span });
79 }
80
81 let tag = tag_arg.map(|it| it.item as FilterTag);
82
83 let timeout: Option<Duration> = call.get_flag(engine_state, stack, "timeout")?;
84
85 let mut mailbox = engine_state
86 .current_job
87 .mailbox
88 .lock()
89 .expect("failed to acquire lock");
90
91 if let Some(timeout) = timeout {
92 if timeout == Duration::ZERO {
93 recv_instantly(&mut mailbox, tag, head)
94 } else {
95 recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
96 }
97 } else {
98 recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
99 }
100 }
101
102 fn examples(&self) -> Vec<Example<'_>> {
103 vec![
104 Example {
105 example: "job recv",
106 description: "Block the current thread while no message arrives.",
107 result: None,
108 },
109 Example {
110 example: "job recv --timeout 10sec",
111 description: "Receive a message, wait for at most 10 seconds.",
112 result: None,
113 },
114 Example {
115 example: "job recv --timeout 0sec",
116 description: "Get a message or fail if no message is available immediately.",
117 result: None,
118 },
119 Example {
120 example: "job spawn { sleep 1sec; 'hi' | job send 0 }; job recv",
121 description: "Receive a message from a newly-spawned job.",
122 result: None,
123 },
124 ]
125 }
126}
127
128fn recv_without_time_limit(
129 mailbox: &mut Mailbox,
130 tag: Option<FilterTag>,
131 signals: &Signals,
132 span: Span,
133) -> Result<PipelineData, ShellError> {
134 loop {
135 if signals.interrupted() {
136 return Err(ShellError::Interrupted { span });
137 }
138 match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
139 Ok(value) => return Ok(value),
140 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
142 }
143 }
144}
145
146fn recv_instantly(
147 mailbox: &mut Mailbox,
148 tag: Option<FilterTag>,
149 span: Span,
150) -> Result<PipelineData, ShellError> {
151 match mailbox.try_recv(tag) {
152 Ok(value) => Ok(value),
153 Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
154 Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
155 }
156}
157
158fn recv_with_time_limit(
159 mailbox: &mut Mailbox,
160 tag: Option<FilterTag>,
161 signals: &Signals,
162 span: Span,
163 timeout: Duration,
164) -> Result<PipelineData, ShellError> {
165 let deadline = Instant::now() + timeout;
166
167 loop {
168 if signals.interrupted() {
169 return Err(ShellError::Interrupted { span });
170 }
171
172 let time_until_deadline = deadline.saturating_duration_since(Instant::now());
173
174 let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
175
176 match mailbox.recv_timeout(tag, time_to_sleep) {
177 Ok(value) => return Ok(value),
178 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
180 }
181
182 if time_until_deadline.is_zero() {
183 return Err(JobError::RecvTimeout { span }.into());
184 }
185 }
186}