ockam_node/context/
send_message.rs

1use crate::context::MessageWait;
2use crate::error::*;
3use crate::{debugger, Context, MessageReceiveOptions, DEFAULT_TIMEOUT};
4use cfg_if::cfg_if;
5use core::time::Duration;
6use ockam_core::compat::{sync::Arc, vec::Vec};
7use ockam_core::{
8    errcode::{Kind, Origin},
9    route, Address, AllOutgoingAccessControl, AllowAll, AllowOnwardAddress, Error,
10    IncomingAccessControl, LocalMessage, Mailboxes, Message, OutgoingAccessControl, RelayMessage,
11    Result, Route, Routed,
12};
13use ockam_core::{LocalInfo, Mailbox};
14
15/// Full set of options to `send_and_receive_extended` function
16pub struct MessageSendReceiveOptions {
17    message_wait: MessageWait,
18    incoming_access_control: Option<Arc<dyn IncomingAccessControl>>,
19    outgoing_access_control: Option<Arc<dyn OutgoingAccessControl>>,
20}
21
22impl Default for MessageSendReceiveOptions {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl MessageSendReceiveOptions {
29    /// Default options with [`DEFAULT_TIMEOUT`] and no flow control
30    pub fn new() -> Self {
31        Self {
32            message_wait: MessageWait::Timeout(DEFAULT_TIMEOUT),
33            incoming_access_control: None,
34            outgoing_access_control: None,
35        }
36    }
37
38    /// Set custom timeout
39    pub fn with_timeout(mut self, timeout: Duration) -> Self {
40        self.message_wait = MessageWait::Timeout(timeout);
41        self
42    }
43
44    /// Wait for the message forever
45    pub fn without_timeout(mut self) -> Self {
46        self.message_wait = MessageWait::Blocking;
47        self
48    }
49
50    /// Set incoming access control
51    pub fn with_incoming_access_control(
52        mut self,
53        incoming_access_control: Arc<dyn IncomingAccessControl>,
54    ) -> Self {
55        self.incoming_access_control = Some(incoming_access_control);
56        self
57    }
58
59    /// Set outgoing access control
60    pub fn with_outgoing_access_control(
61        mut self,
62        outgoing_access_control: Arc<dyn OutgoingAccessControl>,
63    ) -> Self {
64        self.outgoing_access_control = Some(outgoing_access_control);
65        self
66    }
67}
68
69impl Context {
70    /// Using a temporary new context, send a message and then receive a message
71    /// with default timeout and no flow control
72    ///
73    /// This helper function uses [`new_detached`], [`send`], and
74    /// [`receive`] internally. See their documentation for more
75    /// details.
76    ///
77    /// [`new_detached`]: Self::new_detached
78    /// [`send`]: Self::send
79    /// [`receive`]: Self::receive
80    pub async fn send_and_receive<T, R>(&self, route: impl Into<Route>, msg: T) -> Result<R>
81    where
82        T: Message,
83        R: Message,
84    {
85        self.send_and_receive_extended::<T, R>(route, msg, MessageSendReceiveOptions::new())
86            .await?
87            .into_body()
88    }
89
90    /// Using a temporary new context, send a message and then receive a message
91    ///
92    /// This helper function uses [`new_detached`], [`send`], and
93    /// [`receive`] internally. See their documentation for more
94    /// details.
95    ///
96    /// [`new_detached`]: Self::new_detached
97    /// [`send`]: Self::send
98    /// [`receive`]: Self::receive
99    pub async fn send_and_receive_extended<T, R>(
100        &self,
101        route: impl Into<Route>,
102        msg: T,
103        options: MessageSendReceiveOptions,
104    ) -> Result<Routed<R>>
105    where
106        T: Message,
107        R: Message,
108    {
109        let route: Route = route.into();
110
111        let next = route.next()?.clone();
112        let address = Address::random_tagged("Context.send_and_receive.detached");
113
114        let incoming_access_control =
115            if let Some(incoming_access_control) = options.incoming_access_control {
116                incoming_access_control
117            } else {
118                Arc::new(AllowAll)
119            };
120
121        let outgoing_access_control: Arc<dyn OutgoingAccessControl> =
122            if let Some(outgoing_access_control) = options.outgoing_access_control {
123                Arc::new(AllOutgoingAccessControl::new(vec![
124                    outgoing_access_control,
125                    Arc::new(AllowOnwardAddress(next.clone())),
126                ]))
127            } else {
128                Arc::new(AllowOnwardAddress(next.clone()))
129            };
130
131        let mailboxes = Mailboxes::new(
132            Mailbox::new(
133                address.clone(),
134                None,
135                incoming_access_control,
136                outgoing_access_control,
137            ),
138            vec![],
139        );
140
141        if let Some(flow_control_id) = self
142            .flow_controls
143            .find_flow_control_with_producer_address(&next)
144            .map(|x| x.flow_control_id().clone())
145        {
146            // To be able to receive the response
147            self.flow_controls.add_consumer(&address, &flow_control_id);
148        }
149
150        let mut child_ctx = self.new_detached_with_mailboxes(mailboxes)?;
151
152        #[cfg(feature = "std")]
153        child_ctx.set_tracing_context(self.tracing_context());
154
155        child_ctx.send(route, msg).await?;
156        child_ctx
157            .receive_extended::<R>(
158                MessageReceiveOptions::new().with_message_wait(options.message_wait),
159            )
160            .await
161    }
162
163    /// Send a message to another address associated with this worker
164    ///
165    /// This function is a simple wrapper around `Self::send()` which
166    /// validates the address given to it and will reject invalid
167    /// addresses.
168    pub async fn send_to_self<A, M>(&self, from: A, addr: A, msg: M) -> Result<()>
169    where
170        A: Into<Address>,
171        M: Message + Send + 'static,
172    {
173        let addr = addr.into();
174        if self.mailboxes.contains(&addr) {
175            self.send_from_address(addr, msg, from.into()).await
176        } else {
177            Err(NodeError::NodeState(NodeReason::Unknown).internal())
178        }
179    }
180
181    /// Send a message to an address or via a fully-qualified route
182    ///
183    /// Routes can be constructed from a set of [`Address`]es, or via
184    /// the [`RouteBuilder`] type.  Routes can contain middleware
185    /// router addresses, which will re-address messages that need to
186    /// be handled by specific domain workers.
187    ///
188    /// [`Address`]: ockam_core::Address
189    /// [`RouteBuilder`]: ockam_core::RouteBuilder
190    ///
191    /// ```rust
192    /// # use {ockam_node::Context, ockam_core::Result};    /// #
193    /// use ockam_core::{deserialize, serialize, Decodable, Encodable, Encoded};
194    ///
195    /// async fn test(ctx: &mut Context) -> Result<()> {
196    /// use ockam_core::Message;
197    /// use serde::{Serialize, Deserialize};
198    ///
199    /// #[derive(Message, Serialize, Deserialize)]
200    /// struct MyMessage(String);
201    ///
202    /// impl MyMessage {
203    ///     fn new(s: &str) -> Self {
204    ///         Self(s.into())
205    ///     }
206    /// }
207    ///
208    /// impl Encodable for MyMessage {
209    ///     fn encode(self) -> Result<Encoded> {
210    ///         Ok(serialize(self)?)
211    ///     }
212    /// }
213    ///
214    /// impl Decodable for MyMessage {
215    ///     fn decode(e: &[u8]) -> Result<Self> {
216    ///         Ok(deserialize(e)?)
217    ///     }
218    /// }
219    ///
220    /// ctx.send("my-test-worker", MyMessage::new("Hello you there :)")).await?;
221    /// Ok(())
222    /// # }
223    /// ```
224    pub async fn send<R, M>(&self, route: R, msg: M) -> Result<()>
225    where
226        R: Into<Route>,
227        M: Message,
228    {
229        self.send_from_address(route.into(), msg, self.primary_address().clone())
230            .await
231    }
232
233    /// Send a message to an address or via a fully-qualified route
234    /// after attaching the given [`LocalInfo`] to the message.
235    pub async fn send_with_local_info<R, M>(
236        &self,
237        route: R,
238        msg: M,
239        local_info: Vec<LocalInfo>,
240    ) -> Result<()>
241    where
242        R: Into<Route>,
243        M: Message,
244    {
245        self.send_from_address_impl(
246            route.into(),
247            msg,
248            self.primary_address().clone(),
249            local_info,
250        )
251        .await
252    }
253
254    /// Send a message to an address or via a fully-qualified route
255    ///
256    /// Routes can be constructed from a set of [`Address`]es, or via
257    /// the [`RouteBuilder`] type.  Routes can contain middleware
258    /// router addresses, which will re-address messages that need to
259    /// be handled by specific domain workers.
260    ///
261    /// [`Address`]: ockam_core::Address
262    /// [`RouteBuilder`]: ockam_core::RouteBuilder
263    ///
264    /// This function additionally takes the sending address
265    /// parameter, to specify which of a worker's (or processor's)
266    /// addresses should be used.
267    pub async fn send_from_address<R, M>(
268        &self,
269        route: R,
270        msg: M,
271        sending_address: Address,
272    ) -> Result<()>
273    where
274        R: Into<Route>,
275        M: Message,
276    {
277        self.send_from_address_impl(route.into(), msg, sending_address, Vec::new())
278            .await
279    }
280
281    async fn send_from_address_impl<M>(
282        &self,
283        route: Route,
284        msg: M,
285        sending_address: Address,
286        local_info: Vec<LocalInfo>,
287    ) -> Result<()>
288    where
289        M: Message,
290    {
291        // Check if the sender address exists
292        if !self.mailboxes.contains(&sending_address) {
293            return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
294        }
295
296        // First resolve the next hop in the route
297        let addr = match route.next() {
298            Ok(next) => next.clone(),
299            Err(err) => {
300                // TODO: communicate bad routes to calling function
301                error!("Invalid route for message sent from {}", sending_address);
302                return Err(err);
303            }
304        };
305
306        let sender = self.router()?.resolve(&addr)?;
307
308        // Pack the payload into a TransportMessage
309        let payload = msg.encode().map_err(|_| NodeError::Data.internal())?;
310
311        // Pack transport message into a LocalMessage wrapper
312        cfg_if! {
313            if #[cfg(feature = "std")] {
314                let local_msg = LocalMessage::new()
315                    // make sure to set the latest tracing context, to get the latest span id
316                    .with_tracing_context(self.tracing_context().update())
317                    .with_onward_route(route)
318                    .with_return_route(route![sending_address.clone()])
319                    .with_payload(payload)
320                    .with_local_info(local_info);
321            } else {
322                let local_msg = LocalMessage::new()
323                    .with_onward_route(route)
324                    .with_return_route(route![sending_address.clone()])
325                    .with_payload(payload)
326                    .with_local_info(local_info);
327            }
328        }
329
330        // Pack local message into a RelayMessage wrapper
331        let relay_msg = RelayMessage::new(sending_address, addr, local_msg);
332
333        debugger::log_outgoing_message(self, &relay_msg);
334
335        if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
336            warn!(
337                "Message sent from {} to {} did not pass outgoing access control",
338                relay_msg.source(),
339                relay_msg.destination()
340            );
341            return Ok(());
342        }
343
344        // Send the packed user message with associated route
345        sender
346            .send(relay_msg)
347            .await
348            .map_err(NodeError::from_send_err)?;
349
350        Ok(())
351    }
352
353    /// Forward a transport message to its next routing destination
354    ///
355    /// Similar to [`Context::send`], but taking a
356    /// [`LocalMessage`], which contains the full destination
357    /// route, and calculated return route for this hop.
358    ///
359    /// **Note:** you most likely want to use
360    /// [`Context::send`] instead, unless you are writing an
361    /// external router implementation for ockam node.
362    ///
363    /// [`Context::send`]: crate::Context::send
364    /// [`LocalMessage`]: ockam_core::LocalMessage
365    pub async fn forward(&self, local_msg: LocalMessage) -> Result<()> {
366        self.forward_from_address(local_msg, self.primary_address().clone())
367            .await
368    }
369
370    /// Forward a transport message to its next routing destination
371    ///
372    /// Similar to [`Context::send`], but taking a
373    /// [`LocalMessage`], which contains the full destination
374    /// route, and calculated return route for this hop.
375    ///
376    /// **Note:** you most likely want to use
377    /// [`Context::send`] instead, unless you are writing an
378    /// external router implementation for ockam node.
379    ///
380    /// [`Context::send`]: crate::Context::send
381    /// [`LocalMessage`]: ockam_core::LocalMessage
382    pub async fn forward_from_address(
383        &self,
384        local_msg: LocalMessage,
385        sending_address: Address,
386    ) -> Result<()> {
387        // Check if the sender address exists
388        if !self.mailboxes.contains(&sending_address) {
389            return Err(Error::new_without_cause(Origin::Node, Kind::Invalid));
390        }
391
392        // First resolve the next hop in the route
393        let addr = match local_msg.onward_route().next() {
394            Ok(next) => next.clone(),
395            Err(err) => {
396                // TODO: communicate bad routes to calling function
397                error!(
398                    "Invalid onward route for message forwarded from {}",
399                    local_msg.return_route()
400                );
401                return Err(err);
402            }
403        };
404        let sender = self.router()?.resolve(&addr)?;
405
406        // Pack the transport message into a RelayMessage wrapper
407        let relay_msg = RelayMessage::new(sending_address, addr, local_msg);
408
409        debugger::log_outgoing_message(self, &relay_msg);
410
411        if !self.mailboxes.is_outgoing_authorized(&relay_msg).await? {
412            warn!(
413                "Message forwarded from {} to {} did not pass outgoing access control",
414                relay_msg.source(),
415                relay_msg.destination(),
416            );
417            return Ok(());
418        }
419
420        // Forward the message
421        sender
422            .send(relay_msg)
423            .await
424            .map_err(NodeError::from_send_err)?;
425
426        Ok(())
427    }
428}