Skip to main content

nu_command/experimental/
job_recv.rs

1use std::{
2    sync::mpsc::{RecvTimeoutError, TryRecvError},
3    time::Duration,
4};
5
6use nu_utils::time::Instant;
7
8use nu_engine::command_prelude::*;
9
10use nu_protocol::{
11    Signals,
12    engine::{FilterTag, Mailbox},
13};
14
15#[derive(Clone)]
16pub struct JobRecv;
17
18const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
19
20impl Command for JobRecv {
21    fn name(&self) -> &str {
22        "job recv"
23    }
24
25    fn description(&self) -> &str {
26        "Read a message from a job's mailbox."
27    }
28
29    fn extra_description(&self) -> &str {
30        r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
31This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
32
33Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
34If no tag is specified, this command will accept any message.
35
36If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
37By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
38If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
39
40Note: When using par-each, only one thread at a time can utilize this command.
41In the case of two or more threads running this command, they will wait until other threads are done using it,
42in no particular order, regardless of the specified timeout parameter.
43"#
44    }
45
46    fn signature(&self) -> nu_protocol::Signature {
47        Signature::build("job recv")
48            .category(Category::Experimental)
49            .named("tag", SyntaxShape::Int, "A tag for the message.", None)
50            .named(
51                "timeout",
52                SyntaxShape::Duration,
53                "The maximum time duration to wait for.",
54                None,
55            )
56            .input_output_types(vec![(Type::Nothing, Type::Any)])
57            .allow_variants_without_examples(true)
58    }
59
60    fn search_terms(&self) -> Vec<&str> {
61        vec!["receive"]
62    }
63
64    fn run(
65        &self,
66        engine_state: &EngineState,
67        stack: &mut Stack,
68        call: &Call,
69        _input: PipelineData,
70    ) -> Result<PipelineData, ShellError> {
71        let head = call.head;
72
73        let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
74
75        if let Some(tag) = tag_arg
76            && tag.item < 0
77        {
78            return Err(ShellError::NeedsPositiveValue { span: tag.span });
79        }
80
81        let tag = tag_arg.map(|it| it.item as FilterTag);
82
83        let timeout: Option<Duration> = call.get_flag(engine_state, stack, "timeout")?;
84
85        let mut mailbox = engine_state
86            .current_job
87            .mailbox
88            .lock()
89            .expect("failed to acquire lock");
90
91        if let Some(timeout) = timeout {
92            if timeout == Duration::ZERO {
93                recv_instantly(&mut mailbox, tag, head)
94            } else {
95                recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
96            }
97        } else {
98            recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
99        }
100    }
101
102    fn examples(&self) -> Vec<Example<'_>> {
103        vec![
104            Example {
105                example: "job recv",
106                description: "Block the current thread while no message arrives.",
107                result: None,
108            },
109            Example {
110                example: "job recv --timeout 10sec",
111                description: "Receive a message, wait for at most 10 seconds.",
112                result: None,
113            },
114            Example {
115                example: "job recv --timeout 0sec",
116                description: "Get a message or fail if no message is available immediately.",
117                result: None,
118            },
119            Example {
120                example: "job spawn { sleep 1sec; 'hi' | job send 0 }; job recv",
121                description: "Receive a message from a newly-spawned job.",
122                result: None,
123            },
124        ]
125    }
126}
127
128fn recv_without_time_limit(
129    mailbox: &mut Mailbox,
130    tag: Option<FilterTag>,
131    signals: &Signals,
132    span: Span,
133) -> Result<PipelineData, ShellError> {
134    loop {
135        if signals.interrupted() {
136            return Err(ShellError::Interrupted { span });
137        }
138        match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
139            Ok(value) => return Ok(value),
140            Err(RecvTimeoutError::Timeout) => {} // try again
141            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
142        }
143    }
144}
145
146fn recv_instantly(
147    mailbox: &mut Mailbox,
148    tag: Option<FilterTag>,
149    span: Span,
150) -> Result<PipelineData, ShellError> {
151    match mailbox.try_recv(tag) {
152        Ok(value) => Ok(value),
153        Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
154        Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
155    }
156}
157
158fn recv_with_time_limit(
159    mailbox: &mut Mailbox,
160    tag: Option<FilterTag>,
161    signals: &Signals,
162    span: Span,
163    timeout: Duration,
164) -> Result<PipelineData, ShellError> {
165    let deadline = Instant::now() + timeout;
166
167    loop {
168        if signals.interrupted() {
169            return Err(ShellError::Interrupted { span });
170        }
171
172        let time_until_deadline = deadline.saturating_duration_since(Instant::now());
173
174        let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
175
176        match mailbox.recv_timeout(tag, time_to_sleep) {
177            Ok(value) => return Ok(value),
178            Err(RecvTimeoutError::Timeout) => {} // try again
179            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
180        }
181
182        if time_until_deadline.is_zero() {
183            return Err(JobError::RecvTimeout { span }.into());
184        }
185    }
186}