nu_command/experimental/
job_recv.rs

1use std::{
2    sync::mpsc::{RecvTimeoutError, TryRecvError},
3    time::{Duration, Instant},
4};
5
6use nu_engine::command_prelude::*;
7
8use nu_protocol::{
9    Signals,
10    engine::{FilterTag, Mailbox},
11};
12
13#[derive(Clone)]
14pub struct JobRecv;
15
16const CTRL_C_CHECK_INTERVAL: Duration = Duration::from_millis(100);
17
18impl Command for JobRecv {
19    fn name(&self) -> &str {
20        "job recv"
21    }
22
23    fn description(&self) -> &str {
24        "Read a message from the mailbox."
25    }
26
27    fn extra_description(&self) -> &str {
28        r#"When messages are sent to the current process, they get stored in what is called the "mailbox".
29This commands reads and returns a message from the mailbox, in a first-in-first-out fashion.
30
31Messages may have numeric flags attached to them. This commands supports filtering out messages that do not satisfy a given tag, by using the `tag` flag.
32If no tag is specified, this command will accept any message.
33
34If no message with the specified tag (if any) is available in the mailbox, this command will block the current thread until one arrives.
35By default this command block indefinitely until a matching message arrives, but a timeout duration can be specified. 
36If a timeout duration of zero is specified, it will succeed only if there already is a message in the mailbox.
37
38Note: When using par-each, only one thread at a time can utilize this command.
39In the case of two or more threads running this command, they will wait until other threads are done using it,
40in no particular order, regardless of the specified timeout parameter.
41"#
42    }
43
44    fn signature(&self) -> nu_protocol::Signature {
45        Signature::build("job recv")
46            .category(Category::Experimental)
47            .named("tag", SyntaxShape::Int, "A tag for the message", None)
48            .named(
49                "timeout",
50                SyntaxShape::Duration,
51                "The maximum time duration to wait for.",
52                None,
53            )
54            .input_output_types(vec![(Type::Nothing, Type::Any)])
55            .allow_variants_without_examples(true)
56    }
57
58    fn search_terms(&self) -> Vec<&str> {
59        vec!["receive"]
60    }
61
62    fn run(
63        &self,
64        engine_state: &EngineState,
65        stack: &mut Stack,
66        call: &Call,
67        _input: PipelineData,
68    ) -> Result<PipelineData, ShellError> {
69        let head = call.head;
70
71        let tag_arg: Option<Spanned<i64>> = call.get_flag(engine_state, stack, "tag")?;
72
73        if let Some(tag) = tag_arg {
74            if tag.item < 0 {
75                return Err(ShellError::NeedsPositiveValue { span: tag.span });
76            }
77        }
78
79        let tag = tag_arg.map(|it| it.item as FilterTag);
80
81        let duration: Option<i64> = call.get_flag(engine_state, stack, "timeout")?;
82
83        let timeout = duration.map(|it| Duration::from_nanos(it as u64));
84
85        let mut mailbox = engine_state
86            .current_job
87            .mailbox
88            .lock()
89            .expect("failed to acquire lock");
90
91        if let Some(timeout) = timeout {
92            if timeout == Duration::ZERO {
93                recv_instantly(&mut mailbox, tag, head)
94            } else {
95                recv_with_time_limit(&mut mailbox, tag, engine_state.signals(), head, timeout)
96            }
97        } else {
98            recv_without_time_limit(&mut mailbox, tag, engine_state.signals(), head)
99        }
100    }
101
102    fn examples(&self) -> Vec<Example> {
103        vec![
104            Example {
105                example: "job recv",
106                description: "Block the current thread while no message arrives",
107                result: None,
108            },
109            Example {
110                example: "job recv --timeout 10sec",
111                description: "Receive a message, wait for at most 10 seconds.",
112                result: None,
113            },
114            Example {
115                example: "job recv --timeout 0sec",
116                description: "Get a message or fail if no message is available immediately",
117                result: None,
118            },
119        ]
120    }
121}
122
123fn recv_without_time_limit(
124    mailbox: &mut Mailbox,
125    tag: Option<FilterTag>,
126    signals: &Signals,
127    span: Span,
128) -> Result<PipelineData, ShellError> {
129    loop {
130        if signals.interrupted() {
131            return Err(ShellError::Interrupted { span });
132        }
133        match mailbox.recv_timeout(tag, CTRL_C_CHECK_INTERVAL) {
134            Ok(value) => return Ok(value),
135            Err(RecvTimeoutError::Timeout) => {} // try again
136            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
137        }
138    }
139}
140
141fn recv_instantly(
142    mailbox: &mut Mailbox,
143    tag: Option<FilterTag>,
144    span: Span,
145) -> Result<PipelineData, ShellError> {
146    match mailbox.try_recv(tag) {
147        Ok(value) => Ok(value),
148        Err(TryRecvError::Empty) => Err(JobError::RecvTimeout { span }.into()),
149        Err(TryRecvError::Disconnected) => Err(ShellError::Interrupted { span }),
150    }
151}
152
153fn recv_with_time_limit(
154    mailbox: &mut Mailbox,
155    tag: Option<FilterTag>,
156    signals: &Signals,
157    span: Span,
158    timeout: Duration,
159) -> Result<PipelineData, ShellError> {
160    let deadline = Instant::now() + timeout;
161
162    loop {
163        if signals.interrupted() {
164            return Err(ShellError::Interrupted { span });
165        }
166
167        let time_until_deadline = deadline.saturating_duration_since(Instant::now());
168
169        let time_to_sleep = time_until_deadline.min(CTRL_C_CHECK_INTERVAL);
170
171        match mailbox.recv_timeout(tag, time_to_sleep) {
172            Ok(value) => return Ok(value),
173            Err(RecvTimeoutError::Timeout) => {} // try again
174            Err(RecvTimeoutError::Disconnected) => return Err(ShellError::Interrupted { span }),
175        }
176
177        if time_until_deadline.is_zero() {
178            return Err(JobError::RecvTimeout { span }.into());
179        }
180    }
181}