opentelemetry_jaeger/exporter/config/
agent.rs

1use std::borrow::BorrowMut;
2use std::net::ToSocketAddrs;
3use std::sync::Arc;
4use std::{env, net};
5
6use opentelemetry::trace::TraceError;
7use opentelemetry_sdk::trace::{BatchConfig, Config, TracerProvider};
8use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
9
10use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp};
11use crate::exporter::config::{
12    build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig,
13    TransformationConfig,
14};
15use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
16use crate::{Error, Exporter, JaegerTraceRuntime};
17
18/// The max size of UDP packet we want to send, synced with jaeger-agent
19const UDP_PACKET_MAX_LENGTH: usize = 65_000;
20
21/// The hostname for the Jaeger agent.
22/// e.g. "localhost"
23const ENV_AGENT_HOST: &str = "OTEL_EXPORTER_JAEGER_AGENT_HOST";
24
25/// The port for the Jaeger agent.
26/// e.g. 6832
27const ENV_AGENT_PORT: &str = "OTEL_EXPORTER_JAEGER_AGENT_PORT";
28
29/// Default agent host if none is provided
30const DEFAULT_AGENT_ENDPOINT_HOST: &str = "127.0.0.1";
31
32/// Default agent port if none is provided
33const DEFAULT_AGENT_ENDPOINT_PORT: &str = "6831";
34
35/// Deprecation Notice:
36/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
37///
38/// AgentPipeline config and build a exporter targeting a jaeger agent using UDP as transport layer protocol.
39///
40/// ## UDP packet max length
41/// The exporter uses UDP to communicate with the agent. UDP requests may be rejected if it's too long.
42/// See [UDP packet size] for details.
43///
44/// Users can utilise [`with_max_packet_size`] and [`with_auto_split_batch`] to avoid spans loss or UDP requests failure.
45///
46/// The default `max_packet_size` is `65000`([why 65000]?). If your platform has a smaller limit on UDP packet.
47/// You will need to adjust the `max_packet_size` accordingly.
48///
49/// Set `auto_split_batch` to true will config the exporter to split the batch based on `max_packet_size`
50/// automatically. Note that it has a performance overhead as every batch could require multiple requests to export.
51///
52/// For example, OSX UDP packet limit is 9216 by default. You can configure the pipeline as following
53/// to avoid UDP packet breaches the limit.
54///
55/// ```no_run
56/// # use opentelemetry_sdk::runtime::Tokio;
57/// # fn main() {
58///     let tracer = opentelemetry_jaeger::new_agent_pipeline()
59///         .with_endpoint("localhost:6831")
60///         .with_service_name("my_app")
61///         .with_max_packet_size(9_216)
62///         .with_auto_split_batch(true)
63///         .install_batch(Tokio).unwrap();
64/// # }
65/// ```
66///
67/// [`with_auto_split_batch`]: AgentPipeline::with_auto_split_batch
68/// [`with_max_packet_size`]: AgentPipeline::with_max_packet_size
69/// [UDP packet size]: https://stackoverflow.com/questions/1098897/what-is-the-largest-safe-udp-packet-size-on-the-internet
70/// [why 65000]: https://serverfault.com/questions/246508/how-is-the-mtu-is-65535-in-udp-but-ethernet-does-not-allow-frame-size-more-than
71/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
72///
73/// ## Environment variables
74/// The following environment variables are available to configure the agent exporter.
75///
76/// - `OTEL_EXPORTER_JAEGER_AGENT_HOST`, set the host of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST`
77/// is not set, the value will be ignored.
78/// - `OTEL_EXPORTER_JAEGER_AGENT_PORT`, set the port of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST`
79/// is not set, the exporter will use 127.0.0.1 as the host.
80#[derive(Debug)]
81#[deprecated(
82    since = "0.21.0",
83    note = "Please migrate to opentelemetry-otlp exporter."
84)]
85pub struct AgentPipeline {
86    transformation_config: TransformationConfig,
87    trace_config: Option<Config>,
88    batch_config: Option<BatchConfig>,
89    agent_endpoint: Option<String>,
90    max_packet_size: usize,
91    auto_split_batch: bool,
92}
93
94impl Default for AgentPipeline {
95    fn default() -> Self {
96        AgentPipeline {
97            transformation_config: Default::default(),
98            trace_config: Default::default(),
99            batch_config: Some(Default::default()),
100            agent_endpoint: Some(format!(
101                "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
102            )),
103            max_packet_size: UDP_PACKET_MAX_LENGTH,
104            auto_split_batch: false,
105        }
106    }
107}
108
109// implement the seal trait
110impl HasRequiredConfig for AgentPipeline {
111    fn set_transformation_config<T>(&mut self, f: T)
112    where
113        T: FnOnce(&mut TransformationConfig),
114    {
115        f(self.transformation_config.borrow_mut())
116    }
117
118    fn set_trace_config(&mut self, config: Config) {
119        self.trace_config = Some(config)
120    }
121
122    fn set_batch_config(&mut self, config: BatchConfig) {
123        self.batch_config = Some(config)
124    }
125}
126
127/// Start a new pipeline to configure a exporter that target a jaeger agent.
128///
129/// See details for each configurations at [`AgentPipeline`]
130///
131/// Deprecation Notice:
132/// Ingestion of OTLP is now supported in Jaeger please check [crates.io] for more details.
133///
134/// [`AgentPipeline`]: crate::config::agent::AgentPipeline
135/// [crates.io]: https://crates.io/crates/opentelemetry-jaeger
136#[deprecated(
137    since = "0.21.0",
138    note = "Please migrate to opentelemetry-otlp exporter."
139)]
140pub fn new_agent_pipeline() -> AgentPipeline {
141    AgentPipeline::default()
142}
143
144impl AgentPipeline {
145    /// set the endpoint of the agent.
146    ///
147    /// It usually composed by host ip and the port number.
148    /// Any valid socket address can be used.
149    ///
150    /// Default to be `127.0.0.1:6831`.
151    pub fn with_endpoint<T: Into<String>>(self, agent_endpoint: T) -> Self {
152        AgentPipeline {
153            agent_endpoint: Some(agent_endpoint.into()),
154            ..self
155        }
156    }
157
158    /// Assign the max packet size in bytes.
159    ///
160    /// It should be consistent with the limit of platforms. Otherwise, UDP requests maybe reject with
161    /// error like `thrift agent failed with transport error` or `thrift agent failed with message too long`.
162    ///
163    /// The exporter will cut off spans if the batch is long. To avoid this, set [auto_split_batch](AgentPipeline::with_auto_split_batch) to `true`
164    /// to split a batch into multiple UDP packets.
165    ///
166    /// Default to be `65000`.
167    pub fn with_max_packet_size(self, max_packet_size: usize) -> Self {
168        AgentPipeline {
169            max_packet_size,
170            ..self
171        }
172    }
173
174    /// Config whether to auto split batches.
175    ///
176    /// When auto split is set to `true`, the exporter will try to split the
177    /// batch into smaller ones so that there will be minimal data loss. It
178    /// will impact the performance.
179    ///
180    /// Note that if the length of one serialized span is longer than the `max_packet_size`.
181    /// The exporter will return an error as it cannot export the span. Use jaeger collector
182    /// instead of jaeger agent may be help in this case as the exporter will use HTTP to communicate
183    /// with jaeger collector.
184    ///
185    /// Default to be `false`.
186    pub fn with_auto_split_batch(mut self, should_auto_split: bool) -> Self {
187        self.auto_split_batch = should_auto_split;
188        self
189    }
190
191    /// Set the service name of the application. It generally is the name of application.
192    /// Critically, Jaeger backend depends on `Span.Process.ServiceName` to identify the service
193    /// that produced the spans.
194    ///
195    /// Opentelemetry allows set the service name using multiple methods.
196    /// This functions takes priority over all other methods.
197    ///
198    /// If the service name is not set. It will default to be `unknown_service`.
199    pub fn with_service_name<T: Into<String>>(mut self, service_name: T) -> Self {
200        self.set_transformation_config(|config| {
201            config.service_name = Some(service_name.into());
202        });
203        self
204    }
205
206    /// Config whether to export information of instrumentation library.
207    ///
208    /// It's required to [report instrumentation library as span tags].
209    /// However it does have a overhead on performance, performance sensitive applications can
210    /// use this function to opt out reporting instrumentation library.
211    ///
212    /// Default to be `true`.
213    ///
214    /// [report instrumentation library as span tags]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/non-otlp.md#instrumentationscope
215    pub fn with_instrumentation_library_tags(mut self, should_export: bool) -> Self {
216        self.set_transformation_config(|config| {
217            config.export_instrument_library = should_export;
218        });
219        self
220    }
221
222    /// Assign the opentelemetry SDK configurations for the exporter pipeline.
223    ///
224    /// For mapping between opentelemetry configurations and Jaeger spans. Please refer [the spec].
225    ///
226    /// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/jaeger.md#mappings
227    /// # Examples
228    /// Set service name via resource.
229    /// ```rust
230    /// use opentelemetry::KeyValue;
231    /// use opentelemetry_sdk::{Resource, trace::Config};
232    ///
233    /// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
234    ///                 .with_trace_config(
235    ///                       Config::default()
236    ///                         .with_resource(Resource::new(vec![KeyValue::new("service.name", "my-service")]))
237    ///                 );
238    ///
239    /// ```
240    pub fn with_trace_config(mut self, config: Config) -> Self {
241        self.set_trace_config(config);
242        self
243    }
244
245    /// Assign the batch span processor for the exporter pipeline.
246    ///
247    /// If a simple span processor is used by [`install_simple`][AgentPipeline::install_simple]
248    /// or [`build_simple`][AgentPipeline::install_simple], then this config will not be ignored.
249    ///
250    /// # Examples
251    /// Set max queue size.
252    /// ```rust
253    /// use opentelemetry_sdk::trace::BatchConfigBuilder;
254    ///
255    /// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
256    ///                 .with_batch_processor_config(
257    ///                       BatchConfigBuilder::default()
258    ///                         .with_max_queue_size(200)
259    ///                         .build()
260    ///                 );
261    ///
262    /// ```
263    pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
264        self.set_batch_config(config);
265        self
266    }
267
268    /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
269    ///
270    /// The exporter will send each span to the agent upon the span ends.
271    pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
272        let mut builder = TracerProvider::builder();
273
274        let (config, process) = build_config_and_process(
275            self.trace_config.take(),
276            self.transformation_config.service_name.take(),
277        );
278        let exporter = Exporter::new(
279            process.into(),
280            self.transformation_config.export_instrument_library,
281            self.build_sync_agent_uploader()?,
282        );
283
284        builder = builder.with_simple_exporter(exporter);
285        builder = builder.with_config(config);
286
287        Ok(builder.build())
288    }
289
290    /// Build a `TracerProvider` using a async exporter and configurations from the pipeline.
291    ///
292    /// The exporter will collect spans in a batch and send them to the agent.
293    ///
294    /// It's possible to lose spans up to a batch when the application shuts down. So users should
295    /// use [`shut_down_tracer_provider`] to block the shut down process until
296    /// all remaining spans have been sent.
297    ///
298    /// Commonly used runtime are provided via `rt-tokio`, `rt-tokio-current-thread`, `rt-async-std`
299    /// features.
300    ///
301    /// [`shut_down_tracer_provider`]: opentelemetry::global::shutdown_tracer_provider
302    pub fn build_batch<R>(mut self, runtime: R) -> Result<TracerProvider, TraceError>
303    where
304        R: JaegerTraceRuntime,
305    {
306        let mut builder = TracerProvider::builder();
307
308        let export_instrument_library = self.transformation_config.export_instrument_library;
309        // build sdk trace config and jaeger process.
310        // some attributes like service name has attributes like service name
311        let (config, process) = build_config_and_process(
312            self.trace_config.take(),
313            self.transformation_config.service_name.take(),
314        );
315        let batch_config = self.batch_config.take();
316        let uploader = self.build_async_agent_uploader(runtime.clone())?;
317        let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
318        let batch_processor = BatchSpanProcessor::builder(exporter, runtime)
319            .with_batch_config(batch_config.unwrap_or_default())
320            .build();
321
322        builder = builder.with_span_processor(batch_processor);
323        builder = builder.with_config(config);
324
325        Ok(builder.build())
326    }
327
328    /// Similar to [`build_simple`][AgentPipeline::build_simple] but also returns a tracer from the
329    /// tracer provider.
330    ///
331    /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
332    pub fn install_simple(self) -> Result<Tracer, TraceError> {
333        let tracer_provider = self.build_simple()?;
334        install_tracer_provider_and_get_tracer(tracer_provider)
335    }
336
337    /// Similar to [`build_batch`][AgentPipeline::build_batch] but also returns a tracer from the
338    /// tracer provider.
339    ///
340    /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
341    pub fn install_batch<R>(self, runtime: R) -> Result<Tracer, TraceError>
342    where
343        R: JaegerTraceRuntime,
344    {
345        let tracer_provider = self.build_batch(runtime)?;
346        install_tracer_provider_and_get_tracer(tracer_provider)
347    }
348
349    /// Build an jaeger exporter targeting a jaeger agent and running on the async runtime.
350    pub fn build_async_agent_exporter<R>(
351        mut self,
352        runtime: R,
353    ) -> Result<crate::Exporter, TraceError>
354    where
355        R: JaegerTraceRuntime,
356    {
357        let export_instrument_library = self.transformation_config.export_instrument_library;
358        // build sdk trace config and jaeger process.
359        // some attributes like service name has attributes like service name
360        let (_, process) = build_config_and_process(
361            self.trace_config.take(),
362            self.transformation_config.service_name.take(),
363        );
364        let uploader = self.build_async_agent_uploader(runtime)?;
365        Ok(Exporter::new(
366            process.into(),
367            export_instrument_library,
368            uploader,
369        ))
370    }
371
372    /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
373    pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
374        let (_, process) = build_config_and_process(
375            self.trace_config.take(),
376            self.transformation_config.service_name.take(),
377        );
378        Ok(Exporter::new(
379            process.into(),
380            self.transformation_config.export_instrument_library,
381            self.build_sync_agent_uploader()?,
382        ))
383    }
384
385    fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Arc<dyn Uploader>, TraceError>
386    where
387        R: JaegerTraceRuntime,
388    {
389        let agent = AgentAsyncClientUdp::new(
390            self.max_packet_size,
391            runtime,
392            self.auto_split_batch,
393            self.resolve_endpoint()?,
394        )
395        .map_err::<Error, _>(Into::into)?;
396        Ok(Arc::new(AsyncUploader::Agent(
397            futures_util::lock::Mutex::new(agent),
398        )))
399    }
400
401    fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
402        let agent = AgentSyncClientUdp::new(
403            self.max_packet_size,
404            self.auto_split_batch,
405            self.resolve_endpoint()?,
406        )
407        .map_err::<Error, _>(Into::into)?;
408        Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
409    }
410
411    // resolve the agent endpoint from the environment variables or the builder
412    // if only one of the environment variables is set, the other one will be set to the default value
413    // if no environment variable is set, the builder value will be used.
414    fn resolve_endpoint(self) -> Result<Vec<net::SocketAddr>, TraceError> {
415        let endpoint_str = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
416            (Ok(host), Ok(port)) => format!("{}:{}", host.trim(), port.trim()),
417            (Ok(host), _) => format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim()),
418            (_, Ok(port)) => format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim()),
419            (_, _) => self.agent_endpoint.unwrap_or(format!(
420                "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
421            )),
422        };
423        endpoint_str
424            .to_socket_addrs()
425            .map(|addrs| addrs.collect())
426            .map_err(|io_err| {
427                Error::ConfigError {
428                    pipeline_name: "agent",
429                    config_name: "endpoint",
430                    reason: io_err.to_string(),
431                }
432                .into()
433            })
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use crate::config::agent::AgentPipeline;
440
441    #[test]
442    fn set_socket_address() {
443        let test_cases = vec![
444            // invalid inputs
445            ("invalid_endpoint", false),
446            ("0.0.0.0.0:9123", false),
447            ("127.0.0.1", false), // port is needed
448            // valid inputs
449            ("[::0]:9123", true),
450            ("127.0.0.1:1001", true),
451        ];
452        for (socket_str, is_ok) in test_cases.into_iter() {
453            let resolved_endpoint = AgentPipeline::default()
454                .with_endpoint(socket_str)
455                .resolve_endpoint();
456            assert_eq!(
457                resolved_endpoint.is_ok(),
458                // if is_ok is true, use socket_str, otherwise use the default endpoint
459                is_ok,
460                "endpoint string {}",
461                socket_str
462            );
463        }
464    }
465}