opentelemetry_zipkin/exporter/
mod.rs1mod env;
2mod model;
3mod uploader;
4
5use http::Uri;
6use model::endpoint::Endpoint;
7use opentelemetry_http::HttpClient;
8use opentelemetry_sdk::error::OTelSdkResult;
9use opentelemetry_sdk::trace;
10use std::net::{AddrParseError, SocketAddr};
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub struct ZipkinExporter {
16 local_endpoint: Endpoint,
17 uploader: uploader::Uploader,
18}
19
20impl ZipkinExporter {
21 pub fn builder() -> ZipkinExporterBuilder {
23 ZipkinExporterBuilder::default()
24 }
25
26 fn new(local_endpoint: Endpoint, client: Arc<dyn HttpClient>, collector_endpoint: Uri) -> Self {
27 ZipkinExporter {
28 local_endpoint,
29 uploader: uploader::Uploader::new(client, collector_endpoint),
30 }
31 }
32}
33
34#[derive(Debug)]
36pub struct ZipkinExporterBuilder {
37 service_addr: Option<SocketAddr>,
38 collector_endpoint: String,
39 client: Option<Arc<dyn HttpClient>>,
40}
41
42impl Default for ZipkinExporterBuilder {
43 fn default() -> Self {
44 #[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client"))]
45 let timeout = env::get_timeout();
46
47 ZipkinExporterBuilder {
48 #[cfg(feature = "reqwest-blocking-client")]
49 client: Some(Arc::new(
50 reqwest::blocking::Client::builder()
51 .timeout(timeout)
52 .build()
53 .unwrap_or_else(|_| reqwest::blocking::Client::new()),
54 )),
55 #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))]
56 client: Some(Arc::new(
57 reqwest::Client::builder()
58 .timeout(timeout)
59 .build()
60 .unwrap_or_else(|_| reqwest::Client::new()),
61 )),
62 #[cfg(all(
63 not(feature = "reqwest-client"),
64 not(feature = "reqwest-blocking-client")
65 ))]
66 client: None,
67
68 service_addr: None,
69 collector_endpoint: env::get_endpoint(),
70 }
71 }
72}
73
74impl ZipkinExporterBuilder {
75 pub fn build(self) -> Result<ZipkinExporter, ExporterBuildError> {
79 let endpoint = Endpoint::new(self.service_addr);
80
81 if let Some(client) = self.client {
82 let exporter = ZipkinExporter::new(
83 endpoint,
84 client,
85 self.collector_endpoint
86 .parse()
87 .map_err(ExporterBuildError::InvalidUri)?,
88 );
89 Ok(exporter)
90 } else {
91 Err(ExporterBuildError::NoHttpClient)
92 }
93 }
94
95 pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
101 self.client = Some(Arc::new(client));
102 self
103 }
104
105 pub fn with_service_address(mut self, addr: SocketAddr) -> Self {
107 self.service_addr = Some(addr);
108 self
109 }
110
111 pub fn with_collector_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
116 self.collector_endpoint = endpoint.into();
117 self
118 }
119}
120
121async fn zipkin_export(
122 batch: Vec<trace::SpanData>,
123 uploader: uploader::Uploader,
124 local_endpoint: Endpoint,
125) -> OTelSdkResult {
126 let zipkin_spans = batch
127 .into_iter()
128 .map(|span| model::into_zipkin_span(local_endpoint.clone(), span))
129 .collect();
130
131 uploader.upload(zipkin_spans).await
132}
133
134impl trace::SpanExporter for ZipkinExporter {
135 async fn export(&self, batch: Vec<trace::SpanData>) -> OTelSdkResult {
137 zipkin_export(batch, self.uploader.clone(), self.local_endpoint.clone()).await
138 }
139}
140
141#[derive(thiserror::Error, Debug)]
143#[non_exhaustive]
144pub enum ExporterBuildError {
145 #[error("http client must be set, users can enable reqwest feature to use http client implementation within create")]
147 NoHttpClient,
148
149 #[error("invalid uri")]
151 InvalidUri(#[from] http::uri::InvalidUri),
152
153 #[error("invalid address")]
155 InvalidAddress(#[from] AddrParseError),
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::exporter::env::ENV_ENDPOINT;
162
163 #[test]
164 fn test_priority_of_code_based_config_over_envs_for_endpoint() {
165 temp_env::with_vars([(ENV_ENDPOINT, Some("http://127.0.0.1:1234"))], || {
166 let builder =
167 ZipkinExporterBuilder::default().with_collector_endpoint("http://127.0.0.1:2345");
168 assert_eq!(builder.collector_endpoint, "http://127.0.0.1:2345");
169 });
170 }
171
172 #[test]
173 fn test_use_default_when_others_missing_for_endpoint() {
174 temp_env::with_vars([(ENV_ENDPOINT, None::<&str>)], || {
175 let builder = ZipkinExporterBuilder::default();
176 assert_eq!(
177 builder.collector_endpoint,
178 "http://127.0.0.1:9411/api/v2/spans"
179 );
180 });
181 }
182}