opentelemetry_jaeger/exporter/config/agent.rs
1use std::borrow::BorrowMut;
2use std::net::ToSocketAddrs;
3use std::sync::Arc;
4use std::{env, net};
5
6use opentelemetry::trace::TraceError;
7use opentelemetry_sdk::trace::{BatchConfig, Config, TracerProvider};
8use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
9
10use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp};
11use crate::exporter::config::{
12 build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig,
13 TransformationConfig,
14};
15use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
16use crate::{Error, Exporter, JaegerTraceRuntime};
17
18/// The max size of UDP packet we want to send, synced with jaeger-agent
19const UDP_PACKET_MAX_LENGTH: usize = 65_000;
20
21/// The hostname for the Jaeger agent.
22/// e.g. "localhost"
23const ENV_AGENT_HOST: &str = "OTEL_EXPORTER_JAEGER_AGENT_HOST";
24
25/// The port for the Jaeger agent.
26/// e.g. 6832
27const ENV_AGENT_PORT: &str = "OTEL_EXPORTER_JAEGER_AGENT_PORT";
28
29/// Default agent host if none is provided
30const DEFAULT_AGENT_ENDPOINT_HOST: &str = "127.0.0.1";
31
32/// Default agent port if none is provided
33const DEFAULT_AGENT_ENDPOINT_PORT: &str = "6831";
34
35/// Deprecation Notice:
36/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
37///
38/// AgentPipeline config and build a exporter targeting a jaeger agent using UDP as transport layer protocol.
39///
40/// ## UDP packet max length
41/// The exporter uses UDP to communicate with the agent. UDP requests may be rejected if it's too long.
42/// See [UDP packet size] for details.
43///
44/// Users can utilise [`with_max_packet_size`] and [`with_auto_split_batch`] to avoid spans loss or UDP requests failure.
45///
46/// The default `max_packet_size` is `65000`([why 65000]?). If your platform has a smaller limit on UDP packet.
47/// You will need to adjust the `max_packet_size` accordingly.
48///
49/// Set `auto_split_batch` to true will config the exporter to split the batch based on `max_packet_size`
50/// automatically. Note that it has a performance overhead as every batch could require multiple requests to export.
51///
52/// For example, OSX UDP packet limit is 9216 by default. You can configure the pipeline as following
53/// to avoid UDP packet breaches the limit.
54///
55/// ```no_run
56/// # use opentelemetry_sdk::runtime::Tokio;
57/// # fn main() {
58/// let tracer = opentelemetry_jaeger::new_agent_pipeline()
59/// .with_endpoint("localhost:6831")
60/// .with_service_name("my_app")
61/// .with_max_packet_size(9_216)
62/// .with_auto_split_batch(true)
63/// .install_batch(Tokio).unwrap();
64/// # }
65/// ```
66///
67/// [`with_auto_split_batch`]: AgentPipeline::with_auto_split_batch
68/// [`with_max_packet_size`]: AgentPipeline::with_max_packet_size
69/// [UDP packet size]: https://stackoverflow.com/questions/1098897/what-is-the-largest-safe-udp-packet-size-on-the-internet
70/// [why 65000]: https://serverfault.com/questions/246508/how-is-the-mtu-is-65535-in-udp-but-ethernet-does-not-allow-frame-size-more-than
71/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
72///
73/// ## Environment variables
74/// The following environment variables are available to configure the agent exporter.
75///
76/// - `OTEL_EXPORTER_JAEGER_AGENT_HOST`, set the host of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST`
77/// is not set, the value will be ignored.
78/// - `OTEL_EXPORTER_JAEGER_AGENT_PORT`, set the port of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST`
79/// is not set, the exporter will use 127.0.0.1 as the host.
80#[derive(Debug)]
81#[deprecated(
82 since = "0.21.0",
83 note = "Please migrate to opentelemetry-otlp exporter."
84)]
85pub struct AgentPipeline {
86 transformation_config: TransformationConfig,
87 trace_config: Option<Config>,
88 batch_config: Option<BatchConfig>,
89 agent_endpoint: Option<String>,
90 max_packet_size: usize,
91 auto_split_batch: bool,
92}
93
94impl Default for AgentPipeline {
95 fn default() -> Self {
96 AgentPipeline {
97 transformation_config: Default::default(),
98 trace_config: Default::default(),
99 batch_config: Some(Default::default()),
100 agent_endpoint: Some(format!(
101 "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
102 )),
103 max_packet_size: UDP_PACKET_MAX_LENGTH,
104 auto_split_batch: false,
105 }
106 }
107}
108
109// implement the seal trait
110impl HasRequiredConfig for AgentPipeline {
111 fn set_transformation_config<T>(&mut self, f: T)
112 where
113 T: FnOnce(&mut TransformationConfig),
114 {
115 f(self.transformation_config.borrow_mut())
116 }
117
118 fn set_trace_config(&mut self, config: Config) {
119 self.trace_config = Some(config)
120 }
121
122 fn set_batch_config(&mut self, config: BatchConfig) {
123 self.batch_config = Some(config)
124 }
125}
126
127/// Start a new pipeline to configure a exporter that target a jaeger agent.
128///
129/// See details for each configurations at [`AgentPipeline`]
130///
131/// Deprecation Notice:
132/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
133///
134/// [`AgentPipeline`]: crate::config::agent::AgentPipeline
135/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
136#[deprecated(
137 since = "0.21.0",
138 note = "Please migrate to opentelemetry-otlp exporter."
139)]
140pub fn new_agent_pipeline() -> AgentPipeline {
141 AgentPipeline::default()
142}
143
144impl AgentPipeline {
145 /// set the endpoint of the agent.
146 ///
147 /// It usually composed by host ip and the port number.
148 /// Any valid socket address can be used.
149 ///
150 /// Default to be `127.0.0.1:6831`.
151 pub fn with_endpoint<T: Into<String>>(self, agent_endpoint: T) -> Self {
152 AgentPipeline {
153 agent_endpoint: Some(agent_endpoint.into()),
154 ..self
155 }
156 }
157
158 /// Assign the max packet size in bytes.
159 ///
160 /// It should be consistent with the limit of platforms. Otherwise, UDP requests maybe reject with
161 /// error like `thrift agent failed with transport error` or `thrift agent failed with message too long`.
162 ///
163 /// The exporter will cut off spans if the batch is long. To avoid this, set [auto_split_batch](AgentPipeline::with_auto_split_batch) to `true`
164 /// to split a batch into multiple UDP packets.
165 ///
166 /// Default to be `65000`.
167 pub fn with_max_packet_size(self, max_packet_size: usize) -> Self {
168 AgentPipeline {
169 max_packet_size,
170 ..self
171 }
172 }
173
174 /// Config whether to auto split batches.
175 ///
176 /// When auto split is set to `true`, the exporter will try to split the
177 /// batch into smaller ones so that there will be minimal data loss. It
178 /// will impact the performance.
179 ///
180 /// Note that if the length of one serialized span is longer than the `max_packet_size`.
181 /// The exporter will return an error as it cannot export the span. Use jaeger collector
182 /// instead of jaeger agent may be help in this case as the exporter will use HTTP to communicate
183 /// with jaeger collector.
184 ///
185 /// Default to be `false`.
186 pub fn with_auto_split_batch(mut self, should_auto_split: bool) -> Self {
187 self.auto_split_batch = should_auto_split;
188 self
189 }
190
191 /// Set the service name of the application. It generally is the name of application.
192 /// Critically, Jaeger backend depends on `Span.Process.ServiceName` to identify the service
193 /// that produced the spans.
194 ///
195 /// Opentelemetry allows set the service name using multiple methods.
196 /// This functions takes priority over all other methods.
197 ///
198 /// If the service name is not set. It will default to be `unknown_service`.
199 pub fn with_service_name<T: Into<String>>(mut self, service_name: T) -> Self {
200 self.set_transformation_config(|config| {
201 config.service_name = Some(service_name.into());
202 });
203 self
204 }
205
206 /// Config whether to export information of instrumentation library.
207 ///
208 /// It's required to [report instrumentation library as span tags].
209 /// However it does have a overhead on performance, performance sensitive applications can
210 /// use this function to opt out reporting instrumentation library.
211 ///
212 /// Default to be `true`.
213 ///
214 /// [report instrumentation library as span tags]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/non-otlp.md#instrumentationscope
215 pub fn with_instrumentation_library_tags(mut self, should_export: bool) -> Self {
216 self.set_transformation_config(|config| {
217 config.export_instrument_library = should_export;
218 });
219 self
220 }
221
222 /// Assign the opentelemetry SDK configurations for the exporter pipeline.
223 ///
224 /// For mapping between opentelemetry configurations and Jaeger spans. Please refer [the spec].
225 ///
226 /// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/jaeger.md#mappings
227 /// # Examples
228 /// Set service name via resource.
229 /// ```rust
230 /// use opentelemetry::KeyValue;
231 /// use opentelemetry_sdk::{Resource, trace::Config};
232 ///
233 /// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
234 /// .with_trace_config(
235 /// Config::default()
236 /// .with_resource(Resource::new(vec![KeyValue::new("service.name", "my-service")]))
237 /// );
238 ///
239 /// ```
240 pub fn with_trace_config(mut self, config: Config) -> Self {
241 self.set_trace_config(config);
242 self
243 }
244
245 /// Assign the batch span processor for the exporter pipeline.
246 ///
247 /// If a simple span processor is used by [`install_simple`][AgentPipeline::install_simple]
248 /// or [`build_simple`][AgentPipeline::install_simple], then this config will not be ignored.
249 ///
250 /// # Examples
251 /// Set max queue size.
252 /// ```rust
253 /// use opentelemetry_sdk::trace::BatchConfigBuilder;
254 ///
255 /// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
256 /// .with_batch_processor_config(
257 /// BatchConfigBuilder::default()
258 /// .with_max_queue_size(200)
259 /// .build()
260 /// );
261 ///
262 /// ```
263 pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
264 self.set_batch_config(config);
265 self
266 }
267
268 /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
269 ///
270 /// The exporter will send each span to the agent upon the span ends.
271 pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
272 let mut builder = TracerProvider::builder();
273
274 let (config, process) = build_config_and_process(
275 self.trace_config.take(),
276 self.transformation_config.service_name.take(),
277 );
278 let exporter = Exporter::new(
279 process.into(),
280 self.transformation_config.export_instrument_library,
281 self.build_sync_agent_uploader()?,
282 );
283
284 builder = builder.with_simple_exporter(exporter);
285 builder = builder.with_config(config);
286
287 Ok(builder.build())
288 }
289
290 /// Build a `TracerProvider` using a async exporter and configurations from the pipeline.
291 ///
292 /// The exporter will collect spans in a batch and send them to the agent.
293 ///
294 /// It's possible to lose spans up to a batch when the application shuts down. So users should
295 /// use [`shut_down_tracer_provider`] to block the shut down process until
296 /// all remaining spans have been sent.
297 ///
298 /// Commonly used runtime are provided via `rt-tokio`, `rt-tokio-current-thread`, `rt-async-std`
299 /// features.
300 ///
301 /// [`shut_down_tracer_provider`]: opentelemetry::global::shutdown_tracer_provider
302 pub fn build_batch<R>(mut self, runtime: R) -> Result<TracerProvider, TraceError>
303 where
304 R: JaegerTraceRuntime,
305 {
306 let mut builder = TracerProvider::builder();
307
308 let export_instrument_library = self.transformation_config.export_instrument_library;
309 // build sdk trace config and jaeger process.
310 // some attributes like service name has attributes like service name
311 let (config, process) = build_config_and_process(
312 self.trace_config.take(),
313 self.transformation_config.service_name.take(),
314 );
315 let batch_config = self.batch_config.take();
316 let uploader = self.build_async_agent_uploader(runtime.clone())?;
317 let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
318 let batch_processor = BatchSpanProcessor::builder(exporter, runtime)
319 .with_batch_config(batch_config.unwrap_or_default())
320 .build();
321
322 builder = builder.with_span_processor(batch_processor);
323 builder = builder.with_config(config);
324
325 Ok(builder.build())
326 }
327
328 /// Similar to [`build_simple`][AgentPipeline::build_simple] but also returns a tracer from the
329 /// tracer provider.
330 ///
331 /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
332 pub fn install_simple(self) -> Result<Tracer, TraceError> {
333 let tracer_provider = self.build_simple()?;
334 install_tracer_provider_and_get_tracer(tracer_provider)
335 }
336
337 /// Similar to [`build_batch`][AgentPipeline::build_batch] but also returns a tracer from the
338 /// tracer provider.
339 ///
340 /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
341 pub fn install_batch<R>(self, runtime: R) -> Result<Tracer, TraceError>
342 where
343 R: JaegerTraceRuntime,
344 {
345 let tracer_provider = self.build_batch(runtime)?;
346 install_tracer_provider_and_get_tracer(tracer_provider)
347 }
348
349 /// Build an jaeger exporter targeting a jaeger agent and running on the async runtime.
350 pub fn build_async_agent_exporter<R>(
351 mut self,
352 runtime: R,
353 ) -> Result<crate::Exporter, TraceError>
354 where
355 R: JaegerTraceRuntime,
356 {
357 let export_instrument_library = self.transformation_config.export_instrument_library;
358 // build sdk trace config and jaeger process.
359 // some attributes like service name has attributes like service name
360 let (_, process) = build_config_and_process(
361 self.trace_config.take(),
362 self.transformation_config.service_name.take(),
363 );
364 let uploader = self.build_async_agent_uploader(runtime)?;
365 Ok(Exporter::new(
366 process.into(),
367 export_instrument_library,
368 uploader,
369 ))
370 }
371
372 /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
373 pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
374 let (_, process) = build_config_and_process(
375 self.trace_config.take(),
376 self.transformation_config.service_name.take(),
377 );
378 Ok(Exporter::new(
379 process.into(),
380 self.transformation_config.export_instrument_library,
381 self.build_sync_agent_uploader()?,
382 ))
383 }
384
385 fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Arc<dyn Uploader>, TraceError>
386 where
387 R: JaegerTraceRuntime,
388 {
389 let agent = AgentAsyncClientUdp::new(
390 self.max_packet_size,
391 runtime,
392 self.auto_split_batch,
393 self.resolve_endpoint()?,
394 )
395 .map_err::<Error, _>(Into::into)?;
396 Ok(Arc::new(AsyncUploader::Agent(
397 futures_util::lock::Mutex::new(agent),
398 )))
399 }
400
401 fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
402 let agent = AgentSyncClientUdp::new(
403 self.max_packet_size,
404 self.auto_split_batch,
405 self.resolve_endpoint()?,
406 )
407 .map_err::<Error, _>(Into::into)?;
408 Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
409 }
410
411 // resolve the agent endpoint from the environment variables or the builder
412 // if only one of the environment variables is set, the other one will be set to the default value
413 // if no environment variable is set, the builder value will be used.
414 fn resolve_endpoint(self) -> Result<Vec<net::SocketAddr>, TraceError> {
415 let endpoint_str = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
416 (Ok(host), Ok(port)) => format!("{}:{}", host.trim(), port.trim()),
417 (Ok(host), _) => format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim()),
418 (_, Ok(port)) => format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim()),
419 (_, _) => self.agent_endpoint.unwrap_or(format!(
420 "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
421 )),
422 };
423 endpoint_str
424 .to_socket_addrs()
425 .map(|addrs| addrs.collect())
426 .map_err(|io_err| {
427 Error::ConfigError {
428 pipeline_name: "agent",
429 config_name: "endpoint",
430 reason: io_err.to_string(),
431 }
432 .into()
433 })
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use crate::config::agent::AgentPipeline;
440
441 #[test]
442 fn set_socket_address() {
443 let test_cases = vec![
444 // invalid inputs
445 ("invalid_endpoint", false),
446 ("0.0.0.0.0:9123", false),
447 ("127.0.0.1", false), // port is needed
448 // valid inputs
449 ("[::0]:9123", true),
450 ("127.0.0.1:1001", true),
451 ];
452 for (socket_str, is_ok) in test_cases.into_iter() {
453 let resolved_endpoint = AgentPipeline::default()
454 .with_endpoint(socket_str)
455 .resolve_endpoint();
456 assert_eq!(
457 resolved_endpoint.is_ok(),
458 // if is_ok is true, use socket_str, otherwise use the default endpoint
459 is_ok,
460 "endpoint string {}",
461 socket_str
462 );
463 }
464 }
465}