ockam_node/context/
receive_message.rs

1use core::sync::atomic::Ordering;
2use core::time::Duration;
3
4use ockam_core::{Message, RelayMessage, Result, Routed};
5
6use crate::debugger;
7use crate::error::*;
8use crate::tokio::time::timeout;
9use crate::{Context, DEFAULT_TIMEOUT};
10
11pub(super) enum MessageWait {
12    Timeout(Duration),
13    Blocking,
14}
15
16/// Full set of options to `send_and_receive_extended` function
17pub struct MessageReceiveOptions {
18    message_wait: MessageWait,
19}
20
21impl Default for MessageReceiveOptions {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl MessageReceiveOptions {
28    /// Default options with [`DEFAULT_TIMEOUT`]
29    pub fn new() -> Self {
30        Self {
31            message_wait: MessageWait::Timeout(DEFAULT_TIMEOUT),
32        }
33    }
34
35    pub(super) fn with_message_wait(mut self, message_wait: MessageWait) -> Self {
36        self.message_wait = message_wait;
37        self
38    }
39
40    /// Set custom timeout
41    pub fn with_timeout(mut self, timeout: Duration) -> Self {
42        self.message_wait = MessageWait::Timeout(timeout);
43        self
44    }
45
46    /// Set custom timeout in seconds
47    pub fn with_timeout_secs(mut self, timeout: u64) -> Self {
48        self.message_wait = MessageWait::Timeout(Duration::from_secs(timeout));
49        self
50    }
51
52    /// Wait for the message forever
53    pub fn without_timeout(mut self) -> Self {
54        self.message_wait = MessageWait::Blocking;
55        self
56    }
57}
58
59impl Context {
60    /// Wait for the next message from the mailbox
61    pub(crate) async fn receiver_next(&mut self) -> Result<Option<RelayMessage>> {
62        loop {
63            let relay_msg = if let Some(msg) = self.receiver.recv().await.map(|msg| {
64                trace!(address=%self.primary_address(), "received new message!");
65
66                // First we update the mailbox fill metrics
67                self.mailbox_count.fetch_sub(1, Ordering::Acquire);
68
69                msg
70            }) {
71                msg
72            } else {
73                // no more messages
74                return Ok(None);
75            };
76
77            debugger::log_incoming_message(self, &relay_msg);
78
79            if !self.mailboxes.is_incoming_authorized(&relay_msg).await? {
80                warn!(
81                    "Message received from {} for {} did not pass incoming access control",
82                    relay_msg.source(),
83                    relay_msg.destination()
84                );
85                debug!(
86                    "Message return_route: {:?} onward_route: {:?}",
87                    relay_msg.return_route(),
88                    relay_msg.onward_route()
89                );
90                continue;
91            }
92
93            return Ok(Some(relay_msg));
94        }
95    }
96
97    /// A convenience function to get a Routed message from the Mailbox
98    async fn next_from_mailbox<M: Message>(&mut self) -> Result<Routed<M>> {
99        let msg = self
100            .receiver_next()
101            .await?
102            .ok_or_else(|| NodeError::Data.not_found())?;
103        let destination_addr = msg.destination().clone();
104        let src_addr = msg.source().clone();
105        let local_msg = msg.into_local_message();
106
107        Ok(Routed::new(destination_addr, src_addr, local_msg))
108    }
109
110    /// Block the current worker to wait for a typed message
111    ///
112    /// This function may return a `Err(FailedLoadData)` if the
113    /// underlying worker was shut down, or `Err(Timeout)` if the call
114    /// was waiting for longer than the `default timeout`.
115    ///
116    /// Use [`receive_extended()`](Self::receive_extended) to use a specific timeout period.
117    ///
118    /// Will return `None` if the corresponding worker has been
119    /// stopped, or the underlying Node has shut down.
120    pub async fn receive<M: Message>(&mut self) -> Result<Routed<M>> {
121        self.receive_extended(MessageReceiveOptions::new()).await
122    }
123
124    /// Wait to receive a typed message
125    pub async fn receive_extended<M: Message>(
126        &mut self,
127        options: MessageReceiveOptions,
128    ) -> Result<Routed<M>> {
129        match options.message_wait {
130            MessageWait::Timeout(timeout_duration) => {
131                timeout(timeout_duration, async { self.next_from_mailbox().await })
132                    .await
133                    .map_err(|_| NodeError::Data.with_timeout(timeout_duration))?
134            }
135            MessageWait::Blocking => self.next_from_mailbox().await,
136        }
137    }
138}