ockam_node/context/send_message.rs
1use crate::context::MessageWait;
2use crate::error::*;
3use crate::{debugger, Context, MessageReceiveOptions, DEFAULT_TIMEOUT};
4use cfg_if::cfg_if;
5use core::time::Duration;
6use ockam_core::compat::{sync::Arc, vec::Vec};
7use ockam_core::{
8 errcode::{Kind, Origin},
9 route, Address, AllOutgoingAccessControl, AllowAll, AllowOnwardAddress, Error,
10 IncomingAccessControl, LocalMessage, Mailboxes, Message, OutgoingAccessControl, RelayMessage,
11 Result, Route, Routed,
12};
13use ockam_core::{LocalInfo, Mailbox};
14
15/// Full set of options to `send_and_receive_extended` function
16pub struct MessageSendReceiveOptions {
17 message_wait: MessageWait,
18 incoming_access_control: Option<Arc<dyn IncomingAccessControl>>,
19 outgoing_access_control: Option<Arc<dyn OutgoingAccessControl>>,
20}
21
22impl Default for MessageSendReceiveOptions {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl MessageSendReceiveOptions {
29 /// Default options with [`DEFAULT_TIMEOUT`] and no flow control
30 pub fn new() -> Self {
31 Self {
32 message_wait: MessageWait::Timeout(DEFAULT_TIMEOUT),
33 incoming_access_control: None,
34 outgoing_access_control: None,
35 }
36 }
37
38 /// Set custom timeout
39 pub fn with_timeout(mut self, timeout: Duration) -> Self {
40 self.message_wait = MessageWait::Timeout(timeout);
41 self
42 }
43
44 /// Wait for the message forever
45 pub fn without_timeout(mut self) -> Self {
46 self.message_wait = MessageWait::Blocking;
47 self
48 }
49
50 /// Set incoming access control
51 pub fn with_incoming_access_control(
52 mut self,
53 incoming_access_control: Arc<dyn IncomingAccessControl>,
54 ) -> Self {
55 self.incoming_access_control = Some(incoming_access_control);
56 self
57 }
58
59 /// Set outgoing access control
60 pub fn with_outgoing_access_control(
61 mut self,
62 outgoing_access_control: Arc<dyn OutgoingAccessControl>,
63 ) -> Self {
64 self.outgoing_access_control = Some(outgoing_access_control);
65 self
66 }
67}
68
69impl Context {
70 /// Using a temporary new context, send a message and then receive a message
71 /// with default timeout and no flow control
72 ///
73 /// This helper function uses [`new_detached`], [`send`], and
74 /// [`receive`] internally. See their documentation for more
75 /// details.
76 ///
77 /// [`new_detached`]: Self::new_detached
78 /// [`send`]: Self::send
79 /// [`receive`]: Self::receive
80 pub async fn send_and_receive<T, R>(&self, route: impl Into<Route>, msg: T) -> Result<R>
81 where
82 T: Message,
83 R: Message,
84 {
85 self.send_and_receive_extended::<T, R>(route, msg, MessageSendReceiveOptions::new())
86 .await?
87 .into_body()
88 }
89
90 /// Using a temporary new context, send a message and then receive a message
91 ///
92 /// This helper function uses [`new_detached`], [`send`], and
93 /// [`receive`] internally. See their documentation for more
94 /// details.
95 ///
96 /// [`new_detached`]: Self::new_detached
97 /// [`send`]: Self::send
98 /// [`receive`]: Self::receive
99 pub async fn send_and_receive_extended<T, R>(
100 &self,
101 route: impl Into<Route>,
102 msg: T,
103 options: MessageSendReceiveOptions,
104 ) -> Result<Routed<R>>
105 where
106 T: Message,
107 R: Message,
108 {
109 let route: Route = route.into();
110
111 let next = route.next()?.clone();
112 let address = Address::random_tagged("Context.send_and_receive.detached");
113
114 let incoming_access_control =
115 if let Some(incoming_access_control) = options.incoming_access_control {
116 incoming_access_control
117 } else {
118 Arc::new(AllowAll)
119 };
120
121 let outgoing_access_control: Arc<dyn OutgoingAccessControl> =
122 if let Some(outgoing_access_control) = options.outgoing_access_control {
123 Arc::new(AllOutgoingAccessControl::new(vec![
124 outgoing_access_control,
125 Arc::new(AllowOnwardAddress(next.clone())),
126 ]))
127 } else {
128 Arc::new(AllowOnwardAddress(next.clone()))
129 };
130
131 let mailboxes = Mailboxes::new(
132 Mailbox::new(
133 address.clone(),
134 None,
135 incoming_access_control,
136 outgoing_access_control,
137 ),
138 vec![],
139 );
140
141 if let Some(flow_control_id) = self
142 .flow_controls
143 .find_flow_control_with_producer_address(&next)
144 .map(|x| x.flow_control_id().clone())
145 {
146 // To be able to receive the response
147 self.flow_controls.add_consumer(&address, &flow_control_id);
148 }
149
150 let mut child_ctx = self.new_detached_with_mailboxes(mailboxes)?;
151
152 #[cfg(feature = "std")]
153 child_ctx.set_tracing_context(self.tracing_context());
154
155 child_ctx.send(route, msg).await?;
156 child_ctx
157 .receive_extended::<R>(
158 MessageReceiveOptions::new().with_message_wait(options.message_wait),
159 )
160 .await
161 }
162
163 /// Send a message to another address associated with this worker
164 ///
165 /// This function is a simple wrapper around `Self::send()` which
166 /// validates the address given to it and will reject invalid
167 /// addresses.
168 pub async fn send_to_self<A, M>(&self, from: A, addr: A, msg: M) -> Result<()>
169 where
170 A: Into<Address>,
171 M: Message + Send + 'static,
172 {
173 let addr = addr.into();
174 if self.mailboxes.contains(&addr) {
175 self.send_from_address(addr, msg, from.into()).await
176 } else {
177 Err(NodeError::NodeState(NodeReason::Unknown).internal())
178 }
179 }
180
181 /// Send a message to an address or via a fully-qualified route
182 ///
183 /// Routes can be constructed from a set of [`Address`]es, or via
184 /// the [`RouteBuilder`] type. Routes can contain middleware
185 /// router addresses, which will re-address messages that need to
186 /// be handled by specific domain workers.
187 ///
188 /// [`Address`]: ockam_core::Address
189 /// [`RouteBuilder`]: ockam_core::RouteBuilder
190 ///
191 /// ```rust
192 /// # use {ockam_node::Context, ockam_core::Result}; /// #
193 /// use ockam_core::{deserialize, serialize, Decodable, Encodable, Encoded};
194 ///
195 /// async fn test(ctx: &mut Context) -> Result<()> {
196 /// use ockam_core::Message;
197 /// use serde::{Serialize, Deserialize};
198 ///
199 /// #[derive(Message, Serialize, Deserialize)]
200 /// struct MyMessage(String);
201 ///
202 /// impl MyMessage {
203 /// fn new(s: &str) -> Self {
204 /// Self(s.into())
205 /// }
206 /// }
207 ///
208 /// impl Encodable for MyMessage {
209 /// fn encode(self) -> Result<Encoded> {
210 /// Ok(serialize(self)?)
211 /// }
212 /// }
213 ///
214 /// impl Decodable for MyMessage {
215 /// fn decode(e: &[u8]) -> Result<Self> {
216 /// Ok(deserialize(e)?)
217 /// }
218 /// }
219 ///
220 /// ctx.send("my-test-worker", MyMessage::new("Hello you there :)")).await?;
221 /// Ok(())
222 /// # }
223 /// ```
224 pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>
225 where
226 R: Into<Route>,
227 M: Message,
228 {
229 self.send_from_address(route.into(), msg, self.primary_address().clone())
230 .await
231 }
232
233 /// Send a message to an address or via a fully-qualified route
234 /// after attaching the given [`LocalInfo`] to the message.
235 pub async fn send_with_local_info<R, M>(
236 &self,
237 route: R,
238 msg: M,
239 local_info: Vec<LocalInfo>,
240 ) -> Result<()>
241 where
242 R: Into<Route>,
243 M: Message,
244 {
245 self.send_from_address_impl(
246 route.into(),
247 msg,
248 self.primary_address().clone(),
249 local_info,
250 )
251 .await
252 }
253
254 /// Send a message to an address or via a fully-qualified route
255 ///
256 /// Routes can be constructed from a set of [`Address`]es, or via
257 /// the [`RouteBuilder`] type. Routes can contain middleware
258 /// router addresses, which will re-address messages that need to
259 /// be handled by specific domain workers.
260 ///
261 /// [`Address`]: ockam_core::Address
262 /// [`RouteBuilder`]: ockam_core::RouteBuilder
263 ///
264 /// This function additionally takes the sending address
265 /// parameter, to specify which of a worker's (or processor's)
266 /// addresses should be used.
267 pub async fn send_from_address<R, M>(
268 &self,
269 route: R,
270 msg: M,
271 sending_address: Address,
272 ) -> Result<()>
273 where
274 R: Into<Route>,
275 M: Message,
276 {
277 self.send_from_address_impl(route.into(), msg, sending_address, Vec::new())
278 .await
279 }
280
281 async fn send_from_address_impl<M>(
282 &self,
283 route: Route,
284 msg: M,
285 sending_address: Address,
286 local_info: Vec<LocalInfo>,
287 ) -> Result<()>
288 where
289 M: Message,
290 {
291 // Check if the sender address exists
292 if !self.mailboxes.contains(&sending_address) {
293 return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
294 }
295
296 // First resolve the next hop in the route
297 let addr = match route.next() {
298 Ok(next) => next.clone(),
299 Err(err) => {
300 // TODO: communicate bad routes to calling function
301 error!("Invalid route for message sent from {}", sending_address);
302 return Err(err);
303 }
304 };
305
306 let sender = self.router()?.resolve(&addr)?;
307
308 // Pack the payload into a TransportMessage
309 let payload = msg.encode().map_err(|_| NodeError::Data.internal())?;
310
311 // Pack transport message into a LocalMessage wrapper
312 cfg_if! {
313 if #[cfg(feature = "std")] {
314 let local_msg = LocalMessage::new()
315 // make sure to set the latest tracing context, to get the latest span id
316 .with_tracing_context(self.tracing_context().update())
317 .with_onward_route(route)
318 .with_return_route(route![sending_address.clone()])
319 .with_payload(payload)
320 .with_local_info(local_info);
321 } else {
322 let local_msg = LocalMessage::new()
323 .with_onward_route(route)
324 .with_return_route(route![sending_address.clone()])
325 .with_payload(payload)
326 .with_local_info(local_info);
327 }
328 }
329
330 // Pack local message into a RelayMessage wrapper
331 let relay_msg = RelayMessage::new(sending_address, addr, local_msg);
332
333 debugger::log_outgoing_message(self, &relay_msg);
334
335 if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
336 warn!(
337 "Message sent from {} to {} did not pass outgoing access control",
338 relay_msg.source(),
339 relay_msg.destination()
340 );
341 return Ok(());
342 }
343
344 // Send the packed user message with associated route
345 sender
346 .send(relay_msg)
347 .await
348 .map_err(NodeError::from_send_err)?;
349
350 Ok(())
351 }
352
353 /// Forward a transport message to its next routing destination
354 ///
355 /// Similar to [`Context::send`], but taking a
356 /// [`LocalMessage`], which contains the full destination
357 /// route, and calculated return route for this hop.
358 ///
359 /// **Note:** you most likely want to use
360 /// [`Context::send`] instead, unless you are writing an
361 /// external router implementation for ockam node.
362 ///
363 /// [`Context::send`]: crate::Context::send
364 /// [`LocalMessage`]: ockam_core::LocalMessage
365 pub async fn forward(&self, local_msg: LocalMessage) -> Result<()> {
366 self.forward_from_address(local_msg, self.primary_address().clone())
367 .await
368 }
369
370 /// Forward a transport message to its next routing destination
371 ///
372 /// Similar to [`Context::send`], but taking a
373 /// [`LocalMessage`], which contains the full destination
374 /// route, and calculated return route for this hop.
375 ///
376 /// **Note:** you most likely want to use
377 /// [`Context::send`] instead, unless you are writing an
378 /// external router implementation for ockam node.
379 ///
380 /// [`Context::send`]: crate::Context::send
381 /// [`LocalMessage`]: ockam_core::LocalMessage
382 pub async fn forward_from_address(
383 &self,
384 local_msg: LocalMessage,
385 sending_address: Address,
386 ) -> Result<()> {
387 // Check if the sender address exists
388 if !self.mailboxes.contains(&sending_address) {
389 return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
390 }
391
392 // First resolve the next hop in the route
393 let addr = match local_msg.onward_route().next() {
394 Ok(next) => next.clone(),
395 Err(err) => {
396 // TODO: communicate bad routes to calling function
397 error!(
398 "Invalid onward route for message forwarded from {}",
399 local_msg.return_route()
400 );
401 return Err(err);
402 }
403 };
404 let sender = self.router()?.resolve(&addr)?;
405
406 // Pack the transport message into a RelayMessage wrapper
407 let relay_msg = RelayMessage::new(sending_address, addr, local_msg);
408
409 debugger::log_outgoing_message(self, &relay_msg);
410
411 if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
412 warn!(
413 "Message forwarded from {} to {} did not pass outgoing access control",
414 relay_msg.source(),
415 relay_msg.destination(),
416 );
417 return Ok(());
418 }
419
420 // Forward the message
421 sender
422 .send(relay_msg)
423 .await
424 .map_err(NodeError::from_send_err)?;
425
426 Ok(())
427 }
428}