ockam_core/routing/message/
local_message.rs1#[cfg(feature = "std")]
2use crate::OpenTelemetryContext;
3use crate::{
4 compat::vec::Vec, deserialize, route, serialize, Address, Decodable, Encodable, Encoded,
5 Message, Route, TransportMessage,
6};
7
8use crate::{LocalInfo, Result};
9use cfg_if::cfg_if;
10use serde::{Deserialize, Serialize};
11
12#[derive(Serialize, Deserialize, Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Message)]
46pub struct LocalMessage {
47 pub onward_route: Route,
49 pub return_route: Route,
51 pub payload: Vec<u8>,
53 pub local_info: Vec<LocalInfo>,
57 #[cfg(feature = "std")]
59 pub tracing_context: OpenTelemetryContext,
60}
61
62impl LocalMessage {
63 pub fn onward_route(&self) -> &Route {
65 &self.onward_route
66 }
67
68 pub fn next_on_onward_route(&self) -> Result<&Address> {
70 self.onward_route.next()
71 }
72
73 pub fn has_next_on_onward_route(&self) -> bool {
75 self.onward_route.next().is_ok()
76 }
77
78 pub fn pop_front_onward_route(mut self) -> Result<Self> {
80 let _ = self.onward_route.step()?;
81 Ok(self)
82 }
83
84 pub fn push_front_onward_route(mut self, address: Address) -> Self {
86 self.onward_route = address + self.onward_route;
87 self
88 }
89
90 pub fn replace_front_onward_route(self, address: Address) -> Result<Self> {
92 Ok(self
93 .pop_front_onward_route()?
94 .push_front_onward_route(address))
95 }
96
97 pub fn prepend_front_onward_route(mut self, route: Route) -> Self {
99 self.onward_route = route + self.onward_route;
100 self
101 }
102
103 pub fn set_onward_route(mut self, route: Route) -> Self {
105 self.onward_route = route;
106 self
107 }
108
109 pub fn return_route(&self) -> &Route {
111 &self.return_route
112 }
113
114 pub fn set_return_route(mut self, route: Route) -> Self {
116 self.return_route = route;
117 self
118 }
119
120 pub fn push_front_return_route(mut self, address: Address) -> Self {
122 self.return_route = address + self.return_route;
123 self
124 }
125
126 pub fn prepend_front_return_route(mut self, route: Route) -> Self {
128 self.return_route = route + self.return_route;
129 self
130 }
131
132 pub fn step_forward(self, address: Address) -> Result<Self> {
134 Ok(self
135 .pop_front_onward_route()?
136 .push_front_return_route(address))
137 }
138
139 pub fn into_payload(self) -> Vec<u8> {
141 self.payload
142 }
143
144 pub fn payload(&self) -> &[u8] {
146 &self.payload
147 }
148
149 pub fn payload_mut(&mut self) -> &mut Vec<u8> {
151 &mut self.payload
152 }
153
154 pub fn set_payload(mut self, payload: Vec<u8>) -> Self {
156 self.payload = payload;
157 self
158 }
159
160 pub fn local_info(&self) -> &[LocalInfo] {
162 &self.local_info
163 }
164
165 pub fn local_info_mut(&mut self) -> &mut Vec<LocalInfo> {
167 &mut self.local_info
168 }
169
170 pub fn clear_local_info(&mut self) {
172 self.local_info.clear()
173 }
174
175 #[cfg(feature = "std")]
177 pub fn tracing_context(&self) -> OpenTelemetryContext {
178 self.tracing_context.clone()
179 }
180
181 pub fn from_transport_message(transport_message: TransportMessage) -> LocalMessage {
183 cfg_if! {
184 if #[cfg(feature = "std")] {
185 LocalMessage::new()
186 .with_tracing_context(transport_message.tracing_context())
187 .with_onward_route(transport_message.onward_route)
188 .with_return_route(transport_message.return_route)
189 .with_payload(transport_message.payload)
190 } else {
191 LocalMessage::new()
192 .with_onward_route(transport_message.onward_route)
193 .with_return_route(transport_message.return_route)
194 .with_payload(transport_message.payload)
195 }
196 }
197 }
198
199 pub fn into_transport_message(self) -> TransportMessage {
201 let transport_message = TransportMessage::new(
202 1,
204 self.onward_route,
205 self.return_route,
206 self.payload,
207 None,
208 );
209
210 cfg_if! {
211 if #[cfg(feature = "std")] {
212 let new_tracing_context = Self::start_new_tracing_context(self.tracing_context.update(), "TransportMessage");
214 transport_message.with_tracing_context(new_tracing_context)
215 } else {
216 transport_message
217 }
218 }
219 }
220
221 #[cfg(feature = "std")]
230 pub fn start_new_tracing_context(
231 tracing_context: OpenTelemetryContext,
232 span_prefix: &str,
233 ) -> String {
234 use crate::OCKAM_TRACER_NAME;
235 use opentelemetry::trace::{Link, SpanBuilder, TraceContextExt, Tracer};
236 use opentelemetry::{global, Context};
237
238 let tracer = global::tracer(OCKAM_TRACER_NAME);
240 let span_builder = SpanBuilder::from_name(format!("{}::start_trace", span_prefix))
241 .with_links(vec![Link::new(
242 tracing_context.extract().span().span_context().clone(),
243 vec![],
244 0,
245 )]);
246 let span = tracer.build_with_context(span_builder, &Context::default());
247 let cx = Context::current_with_span(span);
248
249 let span_builder = SpanBuilder::from_name(format!("{}::end_trace", span_prefix))
251 .with_links(vec![Link::new(cx.span().span_context().clone(), vec![], 0)]);
252 let _ = tracer.build_with_context(span_builder, &tracing_context.extract());
253
254 let new_tracing_context = OpenTelemetryContext::inject(&cx);
256
257 new_tracing_context.to_string()
258 }
259}
260
261impl Default for LocalMessage {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267impl Encodable for LocalMessage {
268 fn encode(self) -> Result<Encoded> {
269 serialize(self)
270 }
271}
272
273impl Decodable for LocalMessage {
274 fn decode(e: &[u8]) -> Result<Self> {
275 deserialize(e)
276 }
277}
278
279impl LocalMessage {
280 fn make(
282 onward_route: Route,
283 return_route: Route,
284 payload: Vec<u8>,
285 local_info: Vec<LocalInfo>,
286 ) -> Self {
287 LocalMessage {
288 onward_route,
289 return_route,
290 payload,
291 local_info,
292 #[cfg(feature = "std")]
293 tracing_context: OpenTelemetryContext::current(),
294 }
295 }
296
297 pub fn new() -> Self {
300 LocalMessage::make(route![], route![], vec![], vec![])
301 }
302
303 pub fn with_onward_route(self, onward_route: Route) -> Self {
305 Self {
306 onward_route,
307 ..self
308 }
309 }
310
311 pub fn with_return_route(self, return_route: Route) -> Self {
313 Self {
314 return_route,
315 ..self
316 }
317 }
318
319 pub fn with_payload(self, payload: Vec<u8>) -> Self {
321 Self { payload, ..self }
322 }
323
324 pub fn with_local_info(self, local_info: Vec<LocalInfo>) -> Self {
326 Self { local_info, ..self }
327 }
328
329 #[cfg(feature = "std")]
331 pub fn with_tracing_context(self, tracing_context: OpenTelemetryContext) -> Self {
332 Self {
333 tracing_context,
334 ..self
335 }
336 }
337}