ockam_node/context/
receive_message.rs1use core::sync::atomic::Ordering;
2use core::time::Duration;
3
4use ockam_core::{Message, RelayMessage, Result, Routed};
5
6use crate::debugger;
7use crate::error::*;
8use crate::tokio::time::timeout;
9use crate::{Context, DEFAULT_TIMEOUT};
10
11pub(super) enum MessageWait {
12 Timeout(Duration),
13 Blocking,
14}
15
16pub struct MessageReceiveOptions {
18 message_wait: MessageWait,
19}
20
21impl Default for MessageReceiveOptions {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl MessageReceiveOptions {
28 pub fn new() -> Self {
30 Self {
31 message_wait: MessageWait::Timeout(DEFAULT_TIMEOUT),
32 }
33 }
34
35 pub(super) fn with_message_wait(mut self, message_wait: MessageWait) -> Self {
36 self.message_wait = message_wait;
37 self
38 }
39
40 pub fn with_timeout(mut self, timeout: Duration) -> Self {
42 self.message_wait = MessageWait::Timeout(timeout);
43 self
44 }
45
46 pub fn with_timeout_secs(mut self, timeout: u64) -> Self {
48 self.message_wait = MessageWait::Timeout(Duration::from_secs(timeout));
49 self
50 }
51
52 pub fn without_timeout(mut self) -> Self {
54 self.message_wait = MessageWait::Blocking;
55 self
56 }
57}
58
59impl Context {
60 pub(crate) async fn receiver_next(&mut self) -> Result<Option<RelayMessage>> {
62 loop {
63 let relay_msg = if let Some(msg) = self.receiver.recv().await.map(|msg| {
64 trace!(address=%self.primary_address(), "received new message!");
65
66 self.mailbox_count.fetch_sub(1, Ordering::Acquire);
68
69 msg
70 }) {
71 msg
72 } else {
73 return Ok(None);
75 };
76
77 debugger::log_incoming_message(self, &relay_msg);
78
79 if !self.mailboxes.is_incoming_authorized(&relay_msg).await? {
80 warn!(
81 "Message received from {} for {} did not pass incoming access control",
82 relay_msg.source(),
83 relay_msg.destination()
84 );
85 debug!(
86 "Message return_route: {:?} onward_route: {:?}",
87 relay_msg.return_route(),
88 relay_msg.onward_route()
89 );
90 continue;
91 }
92
93 return Ok(Some(relay_msg));
94 }
95 }
96
97 async fn next_from_mailbox<M: Message>(&mut self) -> Result<Routed<M>> {
99 let msg = self
100 .receiver_next()
101 .await?
102 .ok_or_else(|| NodeError::Data.not_found())?;
103 let destination_addr = msg.destination().clone();
104 let src_addr = msg.source().clone();
105 let local_msg = msg.into_local_message();
106
107 Ok(Routed::new(destination_addr, src_addr, local_msg))
108 }
109
110 pub async fn receive<M: Message>(&mut self) -> Result<Routed<M>> {
121 self.receive_extended(MessageReceiveOptions::new()).await
122 }
123
124 pub async fn receive_extended<M: Message>(
126 &mut self,
127 options: MessageReceiveOptions,
128 ) -> Result<Routed<M>> {
129 match options.message_wait {
130 MessageWait::Timeout(timeout_duration) => {
131 timeout(timeout_duration, async { self.next_from_mailbox().await })
132 .await
133 .map_err(|_| NodeError::Data.with_timeout(timeout_duration))?
134 }
135 MessageWait::Blocking => self.next_from_mailbox().await,
136 }
137 }
138}