#[cfg(feature = "std")]
use crate::OpenTelemetryContext;
use crate::{
compat::vec::Vec, deserialize, route, serialize, Address, Decodable, Encodable, Encoded,
Message, Route, TransportMessage,
};
use crate::{LocalInfo, Result};
use cfg_if::cfg_if;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Message)]
pub struct LocalMessage {
pub onward_route: Route,
pub return_route: Route,
pub payload: Vec<u8>,
pub local_info: Vec<LocalInfo>,
#[cfg(feature = "std")]
pub tracing_context: OpenTelemetryContext,
}
impl LocalMessage {
pub fn onward_route(&self) -> &Route {
&self.onward_route
}
pub fn next_on_onward_route(&self) -> Result<&Address> {
self.onward_route.next()
}
pub fn has_next_on_onward_route(&self) -> bool {
self.onward_route.next().is_ok()
}
pub fn pop_front_onward_route(mut self) -> Result<Self> {
let _ = self.onward_route.step()?;
Ok(self)
}
pub fn push_front_onward_route(mut self, address: Address) -> Self {
self.onward_route = address + self.onward_route;
self
}
pub fn replace_front_onward_route(self, address: Address) -> Result<Self> {
Ok(self
.pop_front_onward_route()?
.push_front_onward_route(address))
}
pub fn prepend_front_onward_route(mut self, route: Route) -> Self {
self.onward_route = route + self.onward_route;
self
}
pub fn set_onward_route(mut self, route: Route) -> Self {
self.onward_route = route;
self
}
pub fn return_route(&self) -> &Route {
&self.return_route
}
pub fn set_return_route(mut self, route: Route) -> Self {
self.return_route = route;
self
}
pub fn push_front_return_route(mut self, address: Address) -> Self {
self.return_route = address + self.return_route;
self
}
pub fn prepend_front_return_route(mut self, route: Route) -> Self {
self.return_route = route + self.return_route;
self
}
pub fn step_forward(self, address: Address) -> Result<Self> {
Ok(self
.pop_front_onward_route()?
.push_front_return_route(address))
}
pub fn into_payload(self) -> Vec<u8> {
self.payload
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn payload_mut(&mut self) -> &mut Vec<u8> {
&mut self.payload
}
pub fn set_payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn local_info(&self) -> &[LocalInfo] {
&self.local_info
}
pub fn local_info_mut(&mut self) -> &mut Vec<LocalInfo> {
&mut self.local_info
}
pub fn clear_local_info(&mut self) {
self.local_info.clear()
}
#[cfg(feature = "std")]
pub fn tracing_context(&self) -> OpenTelemetryContext {
self.tracing_context.clone()
}
pub fn from_transport_message(transport_message: TransportMessage) -> LocalMessage {
cfg_if! {
if #[cfg(feature = "std")] {
LocalMessage::new()
.with_tracing_context(transport_message.tracing_context())
.with_onward_route(transport_message.onward_route)
.with_return_route(transport_message.return_route)
.with_payload(transport_message.payload)
} else {
LocalMessage::new()
.with_onward_route(transport_message.onward_route)
.with_return_route(transport_message.return_route)
.with_payload(transport_message.payload)
}
}
}
pub fn into_transport_message(self) -> TransportMessage {
let transport_message = TransportMessage::new(
1,
self.onward_route,
self.return_route,
self.payload,
None,
);
cfg_if! {
if #[cfg(feature = "std")] {
let new_tracing_context = Self::start_new_tracing_context(self.tracing_context.update(), "TransportMessage");
transport_message.with_tracing_context(new_tracing_context)
} else {
transport_message
}
}
}
#[cfg(feature = "std")]
pub fn start_new_tracing_context(
tracing_context: OpenTelemetryContext,
span_prefix: &str,
) -> String {
use crate::OCKAM_TRACER_NAME;
use opentelemetry::trace::{Link, SpanBuilder, TraceContextExt, Tracer};
use opentelemetry::{global, Context};
let tracer = global::tracer(OCKAM_TRACER_NAME);
let span_builder = SpanBuilder::from_name(format!("{}::start_trace", span_prefix))
.with_links(vec![Link::new(
tracing_context.extract().span().span_context().clone(),
vec![],
0,
)]);
let span = tracer.build_with_context(span_builder, &Context::default());
let cx = Context::current_with_span(span);
let span_builder = SpanBuilder::from_name(format!("{}::end_trace", span_prefix))
.with_links(vec![Link::new(cx.span().span_context().clone(), vec![], 0)]);
let _ = tracer.build_with_context(span_builder, &tracing_context.extract());
let new_tracing_context = OpenTelemetryContext::inject(&cx);
new_tracing_context.to_string()
}
}
impl Default for LocalMessage {
fn default() -> Self {
Self::new()
}
}
impl Encodable for LocalMessage {
fn encode(self) -> Result<Encoded> {
serialize(self)
}
}
impl Decodable for LocalMessage {
fn decode(e: &[u8]) -> Result<Self> {
deserialize(e)
}
}
impl LocalMessage {
fn make(
onward_route: Route,
return_route: Route,
payload: Vec<u8>,
local_info: Vec<LocalInfo>,
) -> Self {
LocalMessage {
onward_route,
return_route,
payload,
local_info,
#[cfg(feature = "std")]
tracing_context: OpenTelemetryContext::current(),
}
}
pub fn new() -> Self {
LocalMessage::make(route![], route![], vec![], vec![])
}
pub fn with_onward_route(self, onward_route: Route) -> Self {
Self {
onward_route,
..self
}
}
pub fn with_return_route(self, return_route: Route) -> Self {
Self {
return_route,
..self
}
}
pub fn with_payload(self, payload: Vec<u8>) -> Self {
Self { payload, ..self }
}
pub fn with_local_info(self, local_info: Vec<LocalInfo>) -> Self {
Self { local_info, ..self }
}
#[cfg(feature = "std")]
pub fn with_tracing_context(self, tracing_context: OpenTelemetryContext) -> Self {
Self {
tracing_context,
..self
}
}
}