ockam_core/routing/message/
local_message.rs

1#[cfg(feature = "std")]
2use crate::OpenTelemetryContext;
3use crate::{
4    compat::vec::Vec, deserialize, route, serialize, Address, Decodable, Encodable, Encoded,
5    Message, Route, TransportMessage,
6};
7
8use crate::{LocalInfo, Result};
9use cfg_if::cfg_if;
10use serde::{Deserialize, Serialize};
11
12/// A message type that is routed locally within a single node.
13///
14/// [`LocalMessage`] contains:
15///  - An onward route for the message
16///  - A return route
17///  - A binary payload
18///  - Additional metadata as [`LocalInfo`] in binary format, that can be added by Workers
19///    within the same node.
20///
21/// A [`LocalMessage`] can be converted from a [`TransportMessage`] that has just been deserialized
22/// from some binary data arriving on a node.
23///
24/// It can also be converted to a [`TransportMessage`] to be serialized and sent to another node.
25///
26/// When a [`LocalMessage`] has been processed by a worker, its `onward_route` and `return_route` need to be modified
27/// before the message is sent to another worker. This is generally done by:
28///
29///  - popping the first address of the onward route (which should be the worker address)
30///  - push a new return address at the front of the return route (this can be the current worker address but this can
31///    also be the address of another worker, depending on the desired topology).
32///
33/// Therefore, a certain number of functions are available on [`LocalMessage`] to manipulate the onward and return routes:
34///
35/// - pop_front_onward_route: remove the first address of the onward route
36/// - replace_front_onward_route: replace the first address of the onward route with another address
37/// - push_front_onward_route: add an address at the front of the onward route
38/// - prepend_front_onward_route: prepend a whole route at the front of the onward route
39/// - set_onward_route: set a new route for the onward route
40///
41/// There are similar functions for return routes. All modification functions can be composed. For example:
42///
43/// self.pop_front_onward_route()?.prepend_front_return_route(&new_route)
44///
45#[derive(Serialize, Deserialize, Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Message)]
46pub struct LocalMessage {
47    /// Onward message route.
48    pub onward_route: Route,
49    /// Return message route. This field must be populated by routers handling this message along the way.
50    pub return_route: Route,
51    /// The message payload.
52    pub payload: Vec<u8>,
53    /// Local information added by workers to give additional context to the message
54    /// independently of its payload. For example this can be used to store the identifier that
55    /// was used to encrypt the payload
56    pub local_info: Vec<LocalInfo>,
57    /// Local tracing context
58    #[cfg(feature = "std")]
59    pub tracing_context: OpenTelemetryContext,
60}
61
62impl LocalMessage {
63    /// Return a reference to the message onward route
64    pub fn onward_route(&self) -> &Route {
65        &self.onward_route
66    }
67
68    /// Return the next address on the onward route
69    pub fn next_on_onward_route(&self) -> Result<&Address> {
70        self.onward_route.next()
71    }
72
73    /// Return true if an address exists on the onward route
74    pub fn has_next_on_onward_route(&self) -> bool {
75        self.onward_route.next().is_ok()
76    }
77
78    /// Remove the first address of the onward route
79    pub fn pop_front_onward_route(mut self) -> Result<Self> {
80        let _ = self.onward_route.step()?;
81        Ok(self)
82    }
83
84    /// Prepend an address on the onward route
85    pub fn push_front_onward_route(mut self, address: Address) -> Self {
86        self.onward_route = address + self.onward_route;
87        self
88    }
89
90    /// Replace the first address on the onward route
91    pub fn replace_front_onward_route(self, address: Address) -> Result<Self> {
92        Ok(self
93            .pop_front_onward_route()?
94            .push_front_onward_route(address))
95    }
96
97    /// Prepend a route to the onward route
98    pub fn prepend_front_onward_route(mut self, route: Route) -> Self {
99        self.onward_route = route + self.onward_route;
100        self
101    }
102
103    /// Set the message onward route
104    pub fn set_onward_route(mut self, route: Route) -> Self {
105        self.onward_route = route;
106        self
107    }
108
109    /// Return the message return route
110    pub fn return_route(&self) -> &Route {
111        &self.return_route
112    }
113
114    /// Set the message return route
115    pub fn set_return_route(mut self, route: Route) -> Self {
116        self.return_route = route;
117        self
118    }
119
120    /// Prepend an address to the return route
121    pub fn push_front_return_route(mut self, address: Address) -> Self {
122        self.return_route = address + self.return_route;
123        self
124    }
125
126    /// Prepend a route to the return route
127    pub fn prepend_front_return_route(mut self, route: Route) -> Self {
128        self.return_route = route + self.return_route;
129        self
130    }
131
132    /// Remove the first address on the onward route and push another address on the return route
133    pub fn step_forward(self, address: Address) -> Result<Self> {
134        Ok(self
135            .pop_front_onward_route()?
136            .push_front_return_route(address))
137    }
138
139    /// Return the message payload
140    pub fn into_payload(self) -> Vec<u8> {
141        self.payload
142    }
143
144    /// Return a reference to the message payload
145    pub fn payload(&self) -> &[u8] {
146        &self.payload
147    }
148
149    /// Return a mutable reference to the message payload
150    pub fn payload_mut(&mut self) -> &mut Vec<u8> {
151        &mut self.payload
152    }
153
154    /// Set the message payload
155    pub fn set_payload(mut self, payload: Vec<u8>) -> Self {
156        self.payload = payload;
157        self
158    }
159
160    /// Return a reference to the message local info
161    pub fn local_info(&self) -> &[LocalInfo] {
162        &self.local_info
163    }
164
165    /// Return a mutable reference to the message local info
166    pub fn local_info_mut(&mut self) -> &mut Vec<LocalInfo> {
167        &mut self.local_info
168    }
169
170    /// Clear all [`LocalInfo`] entries
171    pub fn clear_local_info(&mut self) {
172        self.local_info.clear()
173    }
174
175    /// Get the tracing context associated to this local message
176    #[cfg(feature = "std")]
177    pub fn tracing_context(&self) -> OpenTelemetryContext {
178        self.tracing_context.clone()
179    }
180
181    /// Create a [`LocalMessage`] from a decoded [`TransportMessage`]
182    pub fn from_transport_message(transport_message: TransportMessage) -> LocalMessage {
183        cfg_if! {
184            if #[cfg(feature = "std")] {
185                LocalMessage::new()
186                    .with_tracing_context(transport_message.tracing_context())
187                    .with_onward_route(transport_message.onward_route)
188                    .with_return_route(transport_message.return_route)
189                    .with_payload(transport_message.payload)
190                } else {
191                LocalMessage::new()
192                    .with_onward_route(transport_message.onward_route)
193                    .with_return_route(transport_message.return_route)
194                    .with_payload(transport_message.payload)
195            }
196        }
197    }
198
199    /// Create a [`TransportMessage`] from a [`LocalMessage`]
200    pub fn into_transport_message(self) -> TransportMessage {
201        let transport_message = TransportMessage::new(
202            // TODO: This whole function should go away as we move TransportMessage to individual crates
203            1,
204            self.onward_route,
205            self.return_route,
206            self.payload,
207            None,
208        );
209
210        cfg_if! {
211            if #[cfg(feature = "std")] {
212                // make sure to pass the latest tracing context
213                let new_tracing_context = Self::start_new_tracing_context(self.tracing_context.update(), "TransportMessage");
214                transport_message.with_tracing_context(new_tracing_context)
215            } else {
216                transport_message
217            }
218        }
219    }
220
221    /// - A new trace is started
222    /// - The previous trace and the new trace are linked together
223    ///
224    /// We start a new trace here in order to make sure that each transport message is always
225    /// associated to a globally unique trace id and then cannot be correlated with another transport
226    /// message that would leave the same node for example.
227    ///
228    /// We can still navigate the two created traces as one thanks to their link.
229    #[cfg(feature = "std")]
230    pub fn start_new_tracing_context(
231        tracing_context: OpenTelemetryContext,
232        span_prefix: &str,
233    ) -> String {
234        use crate::OCKAM_TRACER_NAME;
235        use opentelemetry::trace::{Link, SpanBuilder, TraceContextExt, Tracer};
236        use opentelemetry::{global, Context};
237
238        // start a new trace for this transport message, and link it to the previous trace, via the current tracing context
239        let tracer = global::tracer(OCKAM_TRACER_NAME);
240        let span_builder = SpanBuilder::from_name(format!("{}::start_trace", span_prefix))
241            .with_links(vec![Link::new(
242                tracing_context.extract().span().span_context().clone(),
243                vec![],
244                0,
245            )]);
246        let span = tracer.build_with_context(span_builder, &Context::default());
247        let cx = Context::current_with_span(span);
248
249        // create a span to close the previous trace and link it to the new trace
250        let span_builder = SpanBuilder::from_name(format!("{}::end_trace", span_prefix))
251            .with_links(vec![Link::new(cx.span().span_context().clone(), vec![], 0)]);
252        let _ = tracer.build_with_context(span_builder, &tracing_context.extract());
253
254        // create the new opentelemetry context
255        let new_tracing_context = OpenTelemetryContext::inject(&cx);
256
257        new_tracing_context.to_string()
258    }
259}
260
261impl Default for LocalMessage {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267impl Encodable for LocalMessage {
268    fn encode(self) -> Result<Encoded> {
269        serialize(self)
270    }
271}
272
273impl Decodable for LocalMessage {
274    fn decode(e: &[u8]) -> Result<Self> {
275        deserialize(e)
276    }
277}
278
279impl LocalMessage {
280    /// Create a new `LocalMessage` from the provided transport message and local information.
281    fn make(
282        onward_route: Route,
283        return_route: Route,
284        payload: Vec<u8>,
285        local_info: Vec<LocalInfo>,
286    ) -> Self {
287        LocalMessage {
288            onward_route,
289            return_route,
290            payload,
291            local_info,
292            #[cfg(feature = "std")]
293            tracing_context: OpenTelemetryContext::current(),
294        }
295    }
296
297    /// Create a `LocalMessage` with default values, in order to build it with
298    /// the withXXX methods
299    pub fn new() -> Self {
300        LocalMessage::make(route![], route![], vec![], vec![])
301    }
302
303    /// Specify the onward route for the message
304    pub fn with_onward_route(self, onward_route: Route) -> Self {
305        Self {
306            onward_route,
307            ..self
308        }
309    }
310
311    /// Specify the return route for the message
312    pub fn with_return_route(self, return_route: Route) -> Self {
313        Self {
314            return_route,
315            ..self
316        }
317    }
318
319    /// Specify the payload for the message
320    pub fn with_payload(self, payload: Vec<u8>) -> Self {
321        Self { payload, ..self }
322    }
323
324    /// Specify the local information for the message
325    pub fn with_local_info(self, local_info: Vec<LocalInfo>) -> Self {
326        Self { local_info, ..self }
327    }
328
329    /// Specify the tracing context
330    #[cfg(feature = "std")]
331    pub fn with_tracing_context(self, tracing_context: OpenTelemetryContext) -> Self {
332        Self {
333            tracing_context,
334            ..self
335        }
336    }
337}