1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#[cfg(feature = "std")]
use crate::OpenTelemetryContext;
#[cfg(feature = "tracing_context")]
use crate::OCKAM_TRACER_NAME;
use crate::{compat::vec::Vec, Message, Route};
#[cfg(feature = "std")]
use cfg_if::cfg_if;
use core::fmt::{self, Display, Formatter};
#[cfg(feature = "tracing_context")]
use opentelemetry::{
    global,
    trace::{Link, SpanBuilder, TraceContextExt, Tracer},
    Context,
};
use serde::{Deserialize, Serialize};

/// A generic transport message type.
///
/// This type is exposed in `ockam_core` (and the root `ockam` crate) in
/// order to provide a mechanism for third-party developers to create
/// custom transport channel routers.
///
/// Casual users of Ockam should never have to interact with this type
/// directly.
///
/// # Examples
///
/// See `ockam_transport_tcp::workers::sender::TcpSendWorker` for a usage example.
///
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Message)]
pub struct TransportMessage {
    /// The transport protocol version.
    pub version: u8,
    /// Onward message route.
    pub onward_route: Route,
    /// Return message route.
    ///
    /// This field must be populated by routers handling this message
    /// along the way.
    pub return_route: Route,
    /// The message payload.
    pub payload: Vec<u8>,
    /// An optional tracing context
    #[cfg(feature = "tracing_context")]
    pub tracing_context: Option<String>,
}

impl TransportMessage {
    /// Create a new v1 transport message with empty return route.
    pub fn v1(
        onward_route: impl Into<Route>,
        return_route: impl Into<Route>,
        payload: Vec<u8>,
    ) -> Self {
        Self {
            version: 1,
            onward_route: onward_route.into(),
            return_route: return_route.into(),
            payload,
            #[cfg(feature = "tracing_context")]
            tracing_context: None,
        }
    }

    /// Return a TransportMessage with a new tracing context:
    ///    - A new trace is started
    ///    - The previous trace and the new trace are linked together
    ///
    /// We start a new trace here in order to make sure that each transport message is always
    /// associated to a globally unique trace id and then cannot be correlated with another transport
    /// message that would leave the same node for example.
    ///
    /// We can still navigate the two created traces as one thanks to their link.
    #[cfg(feature = "std")]
    pub fn start_new_tracing_context(self, _tracing_context: OpenTelemetryContext) -> Self {
        cfg_if! {
            if #[cfg(feature = "tracing_context")] {
                // start a new trace for this transport message, and link it to the previous trace, via the current tracing context
                let tracer = global::tracer(OCKAM_TRACER_NAME);
                let span_builder = SpanBuilder::from_name("TransportMessage::start_trace")
                      .with_links(vec![Link::new(_tracing_context.extract().span().span_context().clone(), vec![])]);
                let span = tracer.build_with_context(span_builder, &Context::default());
                let cx = Context::current_with_span(span);

                // create a span to close the previous trace and link it to the new trace
                let span_builder = SpanBuilder::from_name("TransportMessage::end_trace")
                                 .with_links(vec![Link::new(cx.span().span_context().clone(), vec![])]);
                let _ = tracer.build_with_context(span_builder, &_tracing_context.extract());

                // create the new opentelemetry context
                let tracing_context = OpenTelemetryContext::inject(&cx);

                Self {
                    tracing_context: Some(tracing_context.to_string()),
                    ..self
                }
            } else {
                self
            }
        }
    }

    /// Return the tracing context
    #[cfg(feature = "tracing_context")]
    pub fn tracing_context(&self) -> OpenTelemetryContext {
        cfg_if! {
            if #[cfg(feature = "tracing_context")] {
                match self.tracing_context.as_ref() {
                    Some(tracing_context) => OpenTelemetryContext::from_remote_context(tracing_context),
                    None => OpenTelemetryContext::current(),
                }
            } else {
                OpenTelemetryContext::current()
            }
        }
    }
}

impl Display for TransportMessage {
    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
        write!(
            f,
            "Message (onward route: {}, return route: {})",
            self.onward_route, self.return_route
        )
    }
}