1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//! Trace exporters
use crate::{api, sdk};
use async_trait::async_trait;
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
#[cfg(all(feature = "http", feature = "reqwest"))]
use std::convert::TryInto;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
#[cfg(feature = "http")]
use {http::Request, std::error::Error};

pub mod stdout;

/// Describes the result of an export.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ExportResult {
    /// Batch is successfully exported.
    Success,
    /// Batch export failed. Caller must not retry.
    FailedNotRetryable,
    /// Batch export failed transiently. Caller should record error and may retry.
    FailedRetryable,
}

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
///
/// The goals of the interface are:
///
/// - Minimize burden of implementation for protocol-dependent telemetry
///  exporters. The protocol exporter is expected to be primarily a simple
/// telemetry data encoder and transmitter.
/// - Allow implementing helpers as composable components that use the same
/// chainable Exporter interface. SDK authors are encouraged to implement common
/// functionality such as queuing, batching, tagging, etc. as helpers. This
/// functionality will be applicable regardless of what protocol exporter is used.
#[async_trait]
pub trait SpanExporter: Send + Sync + std::fmt::Debug {
    /// Exports a batch of telemetry data. Protocol exporters that will implement
    /// this function are typically expected to serialize and transmit the data
    /// to the destination.
    ///
    /// This function will never be called concurrently for the same exporter
    /// instance. It can be called again only after the current call returns.
    ///
    /// This function must not block indefinitely, there must be a reasonable
    /// upper limit after which the call must time out with an error result.
    async fn export(&self, batch: Vec<SpanData>) -> ExportResult;

    /// Shuts down the exporter. Called when SDK is shut down. This is an
    /// opportunity for exporter to do any cleanup required.
    ///
    /// `shutdown` should be called only once for each Exporter instance. After
    /// the call to `shutdown`, subsequent calls to `SpanExport` are not allowed
    /// and should return an error.
    ///
    /// Shutdown should not block indefinitely (e.g. if it attempts to flush the
    /// data and the destination is unavailable). SDK authors can
    /// decide if they want to make the shutdown timeout to be configurable.
    fn shutdown(&mut self) {}
}

/// A minimal interface necessary for export spans over HTTP.
///
/// Users sometime choose http clients that relay on certain runtime. This trait allows users to bring
/// their choice of http clients.
#[cfg(feature = "http")]
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
    /// Send a batch of spans to collectors
    async fn send(&self, request: Request<Vec<u8>>) -> Result<ExportResult, Box<dyn Error>>;
}

/// `SpanData` contains all the information collected by a `Span` and can be used
/// by exporters as a standard input.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
#[derive(Clone, Debug, PartialEq)]
pub struct SpanData {
    /// Exportable `SpanReference`
    pub span_reference: api::trace::SpanReference,
    /// Span parent id
    pub parent_span_id: api::trace::SpanId,
    /// Span kind
    pub span_kind: api::trace::SpanKind,
    /// Span name
    pub name: String,
    /// Span start time
    pub start_time: SystemTime,
    /// Span end time
    pub end_time: SystemTime,
    /// Span attributes
    pub attributes: sdk::trace::EvictedHashMap,
    /// Span Message events
    pub message_events: sdk::trace::EvictedQueue<api::trace::Event>,
    /// Span Links
    pub links: sdk::trace::EvictedQueue<api::trace::Link>,
    /// Span status code
    pub status_code: api::trace::StatusCode,
    /// Span status message
    pub status_message: String,
    /// Resource contains attributes representing an entity that produced this span.
    pub resource: Arc<sdk::Resource>,
    /// Instrumentation library that produced this span
    #[cfg_attr(feature = "serialize", serde(skip))]
    pub instrumentation_lib: sdk::InstrumentationLibrary,
}

#[cfg(all(feature = "reqwest", feature = "http"))]
#[async_trait]
impl HttpClient for reqwest::Client {
    async fn send(&self, request: Request<Vec<u8>>) -> Result<ExportResult, Box<dyn Error>> {
        let result = self.execute(request.try_into()?).await?;

        if result.status().is_success() {
            Ok(ExportResult::Success)
        } else {
            Ok(ExportResult::FailedNotRetryable)
        }
    }
}

#[cfg(all(feature = "reqwest", feature = "http"))]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
    async fn send(&self, request: Request<Vec<u8>>) -> Result<ExportResult, Box<dyn Error>> {
        let result = self.execute(request.try_into()?)?;

        if result.status().is_success() {
            Ok(ExportResult::Success)
        } else {
            Ok(ExportResult::FailedNotRetryable)
        }
    }
}

#[cfg(all(feature = "surf", feature = "http"))]
#[async_trait]
impl HttpClient for surf::Client {
    async fn send(&self, request: Request<Vec<u8>>) -> Result<ExportResult, Box<dyn Error>> {
        let (parts, body) = request.into_parts();
        let uri = parts.uri.to_string().parse()?;

        let req = surf::Request::builder(surf::http::Method::Post, uri)
            .content_type("application/json")
            .body(body);
        let result = self.send(req).await?;

        if result.status().is_success() {
            Ok(ExportResult::Success)
        } else {
            Ok(ExportResult::FailedNotRetryable)
        }
    }
}

#[cfg(feature = "serialize")]
#[cfg(test)]
mod tests {
    use super::*;
    use crate::api::trace::TraceState;

    #[test]
    fn test_serialise() {
        let trace_id = 7;
        let span_id = 99;

        let trace_flags = 0;
        let remote = false;
        let span_reference = api::trace::SpanReference::new(
            api::trace::TraceId::from_u128(trace_id),
            api::trace::SpanId::from_u64(span_id),
            trace_flags,
            remote,
            TraceState::default(),
        );

        let parent_span_id = 1;
        let span_kind = api::trace::SpanKind::Client;
        let name = "foo/bar baz 人?!".to_string();
        let start_time = SystemTime::now();
        let end_time = SystemTime::now();

        let capacity = 3;
        let attributes = sdk::trace::EvictedHashMap::new(capacity);
        let message_events = sdk::trace::EvictedQueue::new(capacity);
        let links = sdk::trace::EvictedQueue::new(capacity);

        let status_code = api::trace::StatusCode::OK;
        let status_message = String::new();
        let resource = Arc::new(sdk::Resource::default());

        let span_data = SpanData {
            span_reference,
            parent_span_id: api::trace::SpanId::from_u64(parent_span_id),
            span_kind,
            name,
            start_time,
            end_time,
            attributes,
            message_events,
            links,
            status_code,
            status_message,
            resource,
            instrumentation_lib: sdk::InstrumentationLibrary::new("", None),
        };

        let encoded: Vec<u8> = bincode::serialize(&span_data).unwrap();

        let decoded: SpanData = bincode::deserialize(&encoded[..]).unwrap();

        assert_eq!(span_data, decoded);
    }
}