nu_command/experimental/
job_recv.rs

1use std::{
2    sync::mpsc::{RecvTimeoutError, TryRecvError},
3    time::{Duration, Instant},
4};
5
6use nu_engine::command_prelude::*;
7
8use nu_protocol::{
9    Signals,
10    engine::{FilterTag, Mailbox},
11};
12
13#[derive(Clone)]
14pub struct JobRecv;
15
16const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
17
18impl Command for JobRecv {
19    fn name(&self) -> &str {
20        "job recv"
21    }
22
23    fn description(&self) -> &str {
24        "Read a message from the mailbox."
25    }
26
27    fn extra_description(&self) -> &str {
28        r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
29This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
30
31Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
32If no tag is specified, this command will accept any message.
33
34If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
35By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified.
36If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
37
38Note: When using par-each, only one thread at a time can utilize this command.
39In the case of two or more threads running this command, they will wait until other threads are done using it,
40in no particular order, regardless of the specified timeout parameter.
41"#
42    }
43
44    fn signature(&self) -> nu_protocol::Signature {
45        Signature::build("job recv")
46            .category(Category::Experimental)
47            .named("tag", SyntaxShape::Int, "A tag for the message", None)
48            .named(
49                "timeout",
50                SyntaxShape::Duration,
51                "The maximum time duration to wait for.",
52                None,
53            )
54            .input_output_types(vec![(Type::Nothing, Type::Any)])
55            .allow_variants_without_examples(true)
56    }
57
58    fn search_terms(&self) -> Vec<&str> {
59        vec!["receive"]
60    }
61
62    fn run(
63        &self,
64        engine_state: &EngineState,
65        stack: &mut Stack,
66        call: &Call,
67        _input: PipelineData,
68    ) -> Result<PipelineData, ShellError> {
69        let head = call.head;
70
71        let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
72
73        if let Some(tag) = tag_arg
74            && tag.item < 0
75        {
76            return Err(ShellError::NeedsPositiveValue { span: tag.span });
77        }
78
79        let tag = tag_arg.map(|it| it.item as FilterTag);
80
81        let timeout: Option<Duration> = call.get_flag(engine_state, stack, "timeout")?;
82
83        let mut mailbox = engine_state
84            .current_job
85            .mailbox
86            .lock()
87            .expect("failed to acquire lock");
88
89        if let Some(timeout) = timeout {
90            if timeout == Duration::ZERO {
91                recv_instantly(&mut mailbox, tag, head)
92            } else {
93                recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
94            }
95        } else {
96            recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
97        }
98    }
99
100    fn examples(&self) -> Vec<Example<'_>> {
101        vec![
102            Example {
103                example: "job recv",
104                description: "Block the current thread while no message arrives",
105                result: None,
106            },
107            Example {
108                example: "job recv --timeout 10sec",
109                description: "Receive a message, wait for at most 10 seconds.",
110                result: None,
111            },
112            Example {
113                example: "job recv --timeout 0sec",
114                description: "Get a message or fail if no message is available immediately",
115                result: None,
116            },
117            Example {
118                example: "job spawn { sleep 1sec; 'hi' | job send 0 }; job recv",
119                description: "Receive a message from a newly-spawned job",
120                result: None,
121            },
122        ]
123    }
124}
125
126fn recv_without_time_limit(
127    mailbox: &mut Mailbox,
128    tag: Option<FilterTag>,
129    signals: &Signals,
130    span: Span,
131) -> Result<PipelineData, ShellError> {
132    loop {
133        if signals.interrupted() {
134            return Err(ShellError::Interrupted { span });
135        }
136        match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
137            Ok(value) => return Ok(value),
138            Err(RecvTimeoutError::Timeout) => {} // try again
139            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
140        }
141    }
142}
143
144fn recv_instantly(
145    mailbox: &mut Mailbox,
146    tag: Option<FilterTag>,
147    span: Span,
148) -> Result<PipelineData, ShellError> {
149    match mailbox.try_recv(tag) {
150        Ok(value) => Ok(value),
151        Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
152        Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
153    }
154}
155
156fn recv_with_time_limit(
157    mailbox: &mut Mailbox,
158    tag: Option<FilterTag>,
159    signals: &Signals,
160    span: Span,
161    timeout: Duration,
162) -> Result<PipelineData, ShellError> {
163    let deadline = Instant::now() + timeout;
164
165    loop {
166        if signals.interrupted() {
167            return Err(ShellError::Interrupted { span });
168        }
169
170        let time_until_deadline = deadline.saturating_duration_since(Instant::now());
171
172        let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
173
174        match mailbox.recv_timeout(tag, time_to_sleep) {
175            Ok(value) => return Ok(value),
176            Err(RecvTimeoutError::Timeout) => {} // try again
177            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
178        }
179
180        if time_until_deadline.is_zero() {
181            return Err(JobError::RecvTimeout { span }.into());
182        }
183    }
184}