1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
#[cfg(feature = "std")]
use crate::OpenTelemetryContext;
#[cfg(feature = "tracing_context")]
use crate::OCKAM_TRACER_NAME;
use crate::{compat::vec::Vec, Message, Route};
#[cfg(feature = "std")]
use cfg_if::cfg_if;
use core::fmt::{self, Display, Formatter};
#[cfg(feature = "tracing_context")]
use opentelemetry::{
global,
trace::{Link, SpanBuilder, TraceContextExt, Tracer},
Context,
};
use serde::{Deserialize, Serialize};
/// A generic transport message type.
///
/// This type is exposed in `ockam_core` (and the root `ockam` crate) in
/// order to provide a mechanism for third-party developers to create
/// custom transport channel routers.
///
/// Casual users of Ockam should never have to interact with this type
/// directly.
///
/// # Examples
///
/// See `ockam_transport_tcp::workers::sender::TcpSendWorker` for a usage example.
///
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Message)]
pub struct TransportMessage {
/// The transport protocol version.
pub version: u8,
/// Onward message route.
pub onward_route: Route,
/// Return message route.
///
/// This field must be populated by routers handling this message
/// along the way.
pub return_route: Route,
/// The message payload.
pub payload: Vec<u8>,
/// An optional tracing context
#[cfg(feature = "tracing_context")]
pub tracing_context: Option<String>,
}
impl TransportMessage {
/// Create a new v1 transport message with empty return route.
pub fn v1(
onward_route: impl Into<Route>,
return_route: impl Into<Route>,
payload: Vec<u8>,
) -> Self {
Self {
version: 1,
onward_route: onward_route.into(),
return_route: return_route.into(),
payload,
#[cfg(feature = "tracing_context")]
tracing_context: None,
}
}
/// Return a TransportMessage with a new tracing context:
/// - A new trace is started
/// - The previous trace and the new trace are linked together
///
/// We start a new trace here in order to make sure that each transport message is always
/// associated to a globally unique trace id and then cannot be correlated with another transport
/// message that would leave the same node for example.
///
/// We can still navigate the two created traces as one thanks to their link.
#[cfg(feature = "std")]
pub fn start_new_tracing_context(self, _tracing_context: OpenTelemetryContext) -> Self {
cfg_if! {
if #[cfg(feature = "tracing_context")] {
// start a new trace for this transport message, and link it to the previous trace, via the current tracing context
let tracer = global::tracer(OCKAM_TRACER_NAME);
let span_builder = SpanBuilder::from_name("TransportMessage::start_trace")
.with_links(vec![Link::new(_tracing_context.extract().span().span_context().clone(), vec![])]);
let span = tracer.build_with_context(span_builder, &Context::default());
let cx = Context::current_with_span(span);
// create a span to close the previous trace and link it to the new trace
let span_builder = SpanBuilder::from_name("TransportMessage::end_trace")
.with_links(vec![Link::new(cx.span().span_context().clone(), vec![])]);
let _ = tracer.build_with_context(span_builder, &_tracing_context.extract());
// create the new opentelemetry context
let tracing_context = OpenTelemetryContext::inject(&cx);
Self {
tracing_context: Some(tracing_context.to_string()),
..self
}
} else {
self
}
}
}
/// Return the tracing context
#[cfg(feature = "tracing_context")]
pub fn tracing_context(&self) -> OpenTelemetryContext {
cfg_if! {
if #[cfg(feature = "tracing_context")] {
match self.tracing_context.as_ref() {
Some(tracing_context) => OpenTelemetryContext::from_remote_context(tracing_context),
None => OpenTelemetryContext::current(),
}
} else {
OpenTelemetryContext::current()
}
}
}
}
impl Display for TransportMessage {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Message (onward route: {}, return route: {})",
self.onward_route, self.return_route
)
}
}