1use std::{
2 sync::mpsc::{RecvTimeoutError, TryRecvError},
3 time::{Duration, Instant},
4};
5
6use nu_engine::command_prelude::*;
7
8use nu_protocol::{
9 Signals,
10 engine::{FilterTag, Mailbox},
11};
12
13#[derive(Clone)]
14pub struct JobRecv;
15
16const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
17
18impl Command for JobRecv {
19 fn name(&self) -> &str {
20 "job recv"
21 }
22
23 fn description(&self) -> &str {
24 "Read a message from the mailbox."
25 }
26
27 fn extra_description(&self) -> &str {
28 r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
29This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
30
31Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
32If no tag is specified, this command will accept any message.
33
34If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
35By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
36If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
37
38Note: When using par-each, only one thread at a time can utilize this command.
39In the case of two or more threads running this command, they will wait until other threads are done using it,
40in no particular order, regardless of the specified timeout parameter.
41"#
42 }
43
44 fn signature(&self) -> nu_protocol::Signature {
45 Signature::build("job recv")
46 .category(Category::Experimental)
47 .named("tag", SyntaxShape::Int, "A tag for the message", None)
48 .named(
49 "timeout",
50 SyntaxShape::Duration,
51 "The maximum time duration to wait for.",
52 None,
53 )
54 .input_output_types(vec![(Type::Nothing, Type::Any)])
55 .allow_variants_without_examples(true)
56 }
57
58 fn search_terms(&self) -> Vec<&str> {
59 vec!["receive"]
60 }
61
62 fn run(
63 &self,
64 engine_state: &EngineState,
65 stack: &mut Stack,
66 call: &Call,
67 _input: PipelineData,
68 ) -> Result<PipelineData, ShellError> {
69 let head = call.head;
70
71 let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
72
73 if let Some(tag) = tag_arg
74 && tag.item < 0
75 {
76 return Err(ShellError::NeedsPositiveValue { span: tag.span });
77 }
78
79 let tag = tag_arg.map(|it| it.item as FilterTag);
80
81 let timeout: Option<Duration> = call.get_flag(engine_state, stack, "timeout")?;
82
83 let mut mailbox = engine_state
84 .current_job
85 .mailbox
86 .lock()
87 .expect("failed to acquire lock");
88
89 if let Some(timeout) = timeout {
90 if timeout == Duration::ZERO {
91 recv_instantly(&mut mailbox, tag, head)
92 } else {
93 recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
94 }
95 } else {
96 recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
97 }
98 }
99
100 fn examples(&self) -> Vec<Example<'_>> {
101 vec![
102 Example {
103 example: "job recv",
104 description: "Block the current thread while no message arrives",
105 result: None,
106 },
107 Example {
108 example: "job recv --timeout 10sec",
109 description: "Receive a message, wait for at most 10 seconds.",
110 result: None,
111 },
112 Example {
113 example: "job recv --timeout 0sec",
114 description: "Get a message or fail if no message is available immediately",
115 result: None,
116 },
117 Example {
118 example: "job spawn { sleep 1sec; 'hi' | job send 0 }; job recv",
119 description: "Receive a message from a newly-spawned job",
120 result: None,
121 },
122 ]
123 }
124}
125
126fn recv_without_time_limit(
127 mailbox: &mut Mailbox,
128 tag: Option<FilterTag>,
129 signals: &Signals,
130 span: Span,
131) -> Result<PipelineData, ShellError> {
132 loop {
133 if signals.interrupted() {
134 return Err(ShellError::Interrupted { span });
135 }
136 match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
137 Ok(value) => return Ok(value),
138 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
140 }
141 }
142}
143
144fn recv_instantly(
145 mailbox: &mut Mailbox,
146 tag: Option<FilterTag>,
147 span: Span,
148) -> Result<PipelineData, ShellError> {
149 match mailbox.try_recv(tag) {
150 Ok(value) => Ok(value),
151 Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
152 Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
153 }
154}
155
156fn recv_with_time_limit(
157 mailbox: &mut Mailbox,
158 tag: Option<FilterTag>,
159 signals: &Signals,
160 span: Span,
161 timeout: Duration,
162) -> Result<PipelineData, ShellError> {
163 let deadline = Instant::now() + timeout;
164
165 loop {
166 if signals.interrupted() {
167 return Err(ShellError::Interrupted { span });
168 }
169
170 let time_until_deadline = deadline.saturating_duration_since(Instant::now());
171
172 let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
173
174 match mailbox.recv_timeout(tag, time_to_sleep) {
175 Ok(value) => return Ok(value),
176 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
178 }
179
180 if time_until_deadline.is_zero() {
181 return Err(JobError::RecvTimeout { span }.into());
182 }
183 }
184}