nu_command/experimental/
job_recv.rs1use std::{
2 sync::mpsc::{RecvTimeoutError, TryRecvError},
3 time::{Duration, Instant},
4};
5
6use nu_engine::command_prelude::*;
7
8use nu_protocol::{
9 Signals,
10 engine::{FilterTag, Mailbox},
11};
12
13#[derive(Clone)]
14pub struct JobRecv;
15
16const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
17
18impl Command for JobRecv {
19 fn name(&self) -> &str {
20 "job recv"
21 }
22
23 fn description(&self) -> &str {
24 "Read a message from the mailbox."
25 }
26
27 fn extra_description(&self) -> &str {
28 r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
29This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
30
31Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
32If no tag is specified, this command will accept any message.
33
34If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
35By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
36If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
37
38Note: When using par-each, only one thread at a time can utilize this command.
39In the case of two or more threads running this command, they will wait until other threads are done using it,
40in no particular order, regardless of the specified timeout parameter.
41"#
42 }
43
44 fn signature(&self) -> nu_protocol::Signature {
45 Signature::build("job recv")
46 .category(Category::Experimental)
47 .named("tag", SyntaxShape::Int, "A tag for the message", None)
48 .named(
49 "timeout",
50 SyntaxShape::Duration,
51 "The maximum time duration to wait for.",
52 None,
53 )
54 .input_output_types(vec![(Type::Nothing, Type::Any)])
55 .allow_variants_without_examples(true)
56 }
57
58 fn search_terms(&self) -> Vec<&str> {
59 vec!["receive"]
60 }
61
62 fn run(
63 &self,
64 engine_state: &EngineState,
65 stack: &mut Stack,
66 call: &Call,
67 _input: PipelineData,
68 ) -> Result<PipelineData, ShellError> {
69 let head = call.head;
70
71 let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
72
73 if let Some(tag) = tag_arg {
74 if tag.item < 0 {
75 return Err(ShellError::NeedsPositiveValue { span: tag.span });
76 }
77 }
78
79 let tag = tag_arg.map(|it| it.item as FilterTag);
80
81 let duration: Option<i64> = call.get_flag(engine_state, stack, "timeout")?;
82
83 let timeout = duration.map(|it| Duration::from_nanos(it as u64));
84
85 let mut mailbox = engine_state
86 .current_job
87 .mailbox
88 .lock()
89 .expect("failed to acquire lock");
90
91 if let Some(timeout) = timeout {
92 if timeout == Duration::ZERO {
93 recv_instantly(&mut mailbox, tag, head)
94 } else {
95 recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
96 }
97 } else {
98 recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
99 }
100 }
101
102 fn examples(&self) -> Vec<Example> {
103 vec![
104 Example {
105 example: "job recv",
106 description: "Block the current thread while no message arrives",
107 result: None,
108 },
109 Example {
110 example: "job recv --timeout 10sec",
111 description: "Receive a message, wait for at most 10 seconds.",
112 result: None,
113 },
114 Example {
115 example: "job recv --timeout 0sec",
116 description: "Get a message or fail if no message is available immediately",
117 result: None,
118 },
119 ]
120 }
121}
122
123fn recv_without_time_limit(
124 mailbox: &mut Mailbox,
125 tag: Option<FilterTag>,
126 signals: &Signals,
127 span: Span,
128) -> Result<PipelineData, ShellError> {
129 loop {
130 if signals.interrupted() {
131 return Err(ShellError::Interrupted { span });
132 }
133 match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
134 Ok(value) => return Ok(value),
135 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
137 }
138 }
139}
140
141fn recv_instantly(
142 mailbox: &mut Mailbox,
143 tag: Option<FilterTag>,
144 span: Span,
145) -> Result<PipelineData, ShellError> {
146 match mailbox.try_recv(tag) {
147 Ok(value) => Ok(value),
148 Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
149 Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
150 }
151}
152
153fn recv_with_time_limit(
154 mailbox: &mut Mailbox,
155 tag: Option<FilterTag>,
156 signals: &Signals,
157 span: Span,
158 timeout: Duration,
159) -> Result<PipelineData, ShellError> {
160 let deadline = Instant::now() + timeout;
161
162 loop {
163 if signals.interrupted() {
164 return Err(ShellError::Interrupted { span });
165 }
166
167 let time_until_deadline = deadline.saturating_duration_since(Instant::now());
168
169 let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
170
171 match mailbox.recv_timeout(tag, time_to_sleep) {
172 Ok(value) => return Ok(value),
173 Err(RecvTimeoutError::Timeout) => {} Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
175 }
176
177 if time_until_deadline.is_zero() {
178 return Err(JobError::RecvTimeout { span }.into());
179 }
180 }
181}