opentelemetry_zipkin/exporter/
mod.rs

1mod env;
2mod model;
3mod uploader;
4
5use http::Uri;
6use model::endpoint::Endpoint;
7use opentelemetry_http::HttpClient;
8use opentelemetry_sdk::error::OTelSdkResult;
9use opentelemetry_sdk::trace;
10use std::net::{AddrParseError, SocketAddr};
11use std::sync::Arc;
12
13/// Zipkin span exporter
14#[derive(Debug)]
15pub struct ZipkinExporter {
16    local_endpoint: Endpoint,
17    uploader: uploader::Uploader,
18}
19
20impl ZipkinExporter {
21    /// Get a builder to configure a [ZipkinExporter]
22    pub fn builder() -> ZipkinExporterBuilder {
23        ZipkinExporterBuilder::default()
24    }
25
26    fn new(local_endpoint: Endpoint, client: Arc<dyn HttpClient>, collector_endpoint: Uri) -> Self {
27        ZipkinExporter {
28            local_endpoint,
29            uploader: uploader::Uploader::new(client, collector_endpoint),
30        }
31    }
32}
33
34/// Builder for `ZipkinExporter` struct.
35#[derive(Debug)]
36pub struct ZipkinExporterBuilder {
37    service_addr: Option<SocketAddr>,
38    collector_endpoint: String,
39    client: Option<Arc<dyn HttpClient>>,
40}
41
42impl Default for ZipkinExporterBuilder {
43    fn default() -> Self {
44        #[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client"))]
45        let timeout = env::get_timeout();
46
47        ZipkinExporterBuilder {
48            #[cfg(feature = "reqwest-blocking-client")]
49            client: Some(Arc::new(
50                reqwest::blocking::Client::builder()
51                    .timeout(timeout)
52                    .build()
53                    .unwrap_or_else(|_| reqwest::blocking::Client::new()),
54            )),
55            #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
56            client: Some(Arc::new(
57                reqwest::Client::builder()
58                    .timeout(timeout)
59                    .build()
60                    .unwrap_or_else(|_| reqwest::Client::new()),
61            )),
62            #[cfg(all(
63                not(feature = "reqwest-client"),
64                not(feature = "reqwest-blocking-client")
65            ))]
66            client: None,
67
68            service_addr: None,
69            collector_endpoint: env::get_endpoint(),
70        }
71    }
72}
73
74impl ZipkinExporterBuilder {
75    /// Creates a new [ZipkinExporter] from this configuration.
76    ///
77    /// Returns error if the endpoint is not valid or if no http client is provided.
78    pub fn build(self) -> Result<ZipkinExporter, ExporterBuildError> {
79        let endpoint = Endpoint::new(self.service_addr);
80
81        if let Some(client) = self.client {
82            let exporter = ZipkinExporter::new(
83                endpoint,
84                client,
85                self.collector_endpoint
86                    .parse()
87                    .map_err(ExporterBuildError::InvalidUri)?,
88            );
89            Ok(exporter)
90        } else {
91            Err(ExporterBuildError::NoHttpClient)
92        }
93    }
94
95    /// Assign client implementation
96    ///
97    /// When using this method, the export timeout will depend on the provided
98    /// client implementation and may not respect the timeout set via the
99    /// environment variable `OTEL_EXPORTER_ZIPKIN_TIMEOUT`.
100    pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
101        self.client = Some(Arc::new(client));
102        self
103    }
104
105    /// Assign the service address.
106    pub fn with_service_address(mut self, addr: SocketAddr) -> Self {
107        self.service_addr = Some(addr);
108        self
109    }
110
111    /// Assign the Zipkin collector endpoint
112    ///
113    /// Note: Programmatically setting this will override any value
114    /// set via the environment variable `OTEL_EXPORTER_ZIPKIN_ENDPOINT`.
115    pub fn with_collector_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
116        self.collector_endpoint = endpoint.into();
117        self
118    }
119}
120
121async fn zipkin_export(
122    batch: Vec<trace::SpanData>,
123    uploader: uploader::Uploader,
124    local_endpoint: Endpoint,
125) -> OTelSdkResult {
126    let zipkin_spans = batch
127        .into_iter()
128        .map(|span| model::into_zipkin_span(local_endpoint.clone(), span))
129        .collect();
130
131    uploader.upload(zipkin_spans).await
132}
133
134impl trace::SpanExporter for ZipkinExporter {
135    /// Export spans to Zipkin collector.
136    async fn export(&self, batch: Vec<trace::SpanData>) -> OTelSdkResult {
137        zipkin_export(batch, self.uploader.clone(), self.local_endpoint.clone()).await
138    }
139}
140
141/// Wrap type for errors from opentelemetry zipkin
142#[derive(thiserror::Error, Debug)]
143#[non_exhaustive]
144pub enum ExporterBuildError {
145    /// No http client implementation found. User should provide one or enable features.
146    #[error("http client must be set, users can enable reqwest feature to use http client implementation within create")]
147    NoHttpClient,
148
149    /// The uri provided is invalid
150    #[error("invalid uri")]
151    InvalidUri(#[from] http::uri::InvalidUri),
152
153    /// The IP/socket address provided is invalid
154    #[error("invalid address")]
155    InvalidAddress(#[from] AddrParseError),
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::exporter::env::ENV_ENDPOINT;
162
163    #[test]
164    fn test_priority_of_code_based_config_over_envs_for_endpoint() {
165        temp_env::with_vars([(ENV_ENDPOINT, Some("http://127.0.0.1:1234"))], || {
166            let builder =
167                ZipkinExporterBuilder::default().with_collector_endpoint("http://127.0.0.1:2345");
168            assert_eq!(builder.collector_endpoint, "http://127.0.0.1:2345");
169        });
170    }
171
172    #[test]
173    fn test_use_default_when_others_missing_for_endpoint() {
174        temp_env::with_vars([(ENV_ENDPOINT, None::<&str>)], || {
175            let builder = ZipkinExporterBuilder::default();
176            assert_eq!(
177                builder.collector_endpoint,
178                "http://127.0.0.1:9411/api/v2/spans"
179            );
180        });
181    }
182}