opentelemetry_sdk/trace/sampler/jaeger_remote/
sampler.rs

1use crate::runtime::{to_interval_stream, RuntimeChannel};
2use crate::trace::error::TraceError;
3use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
4use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
5use crate::trace::{Sampler, ShouldSample};
6use futures_util::{stream, StreamExt as _};
7use http::Uri;
8use opentelemetry::trace::{Link, SamplingResult, SpanKind, TraceId};
9use opentelemetry::{otel_warn, Context, KeyValue};
10use opentelemetry_http::HttpClient;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14
15const DEFAULT_REMOTE_SAMPLER_ENDPOINT: &str = "http://localhost:5778/sampling";
16
17/// Builder for [`JaegerRemoteSampler`].
18/// See [Sampler::jaeger_remote] for details.
19#[derive(Debug)]
20pub struct JaegerRemoteSamplerBuilder<C, S, R>
21where
22    R: RuntimeChannel,
23    C: HttpClient + 'static,
24    S: ShouldSample + 'static,
25{
26    pub(crate) update_interval: Duration,
27    pub(crate) client: C,
28    pub(crate) endpoint: String,
29    pub(crate) default_sampler: S,
30    pub(crate) leaky_bucket_size: f64,
31    pub(crate) runtime: R,
32    pub(crate) service_name: String,
33}
34
35impl<C, S, R> JaegerRemoteSamplerBuilder<C, S, R>
36where
37    C: HttpClient + 'static,
38    S: ShouldSample + 'static,
39    R: RuntimeChannel,
40{
41    pub(crate) fn new<Svc>(
42        runtime: R,
43        http_client: C,
44        default_sampler: S,
45        service_name: Svc,
46    ) -> Self
47    where
48        Svc: Into<String>,
49    {
50        JaegerRemoteSamplerBuilder {
51            runtime,
52            update_interval: Duration::from_secs(60 * 5),
53            client: http_client,
54            endpoint: DEFAULT_REMOTE_SAMPLER_ENDPOINT.to_string(),
55            default_sampler,
56            leaky_bucket_size: 100.0,
57            service_name: service_name.into(),
58        }
59    }
60
61    /// Change how often the SDK should fetch the sampling strategy from remote servers
62    ///
63    /// By default it fetches every 5 minutes.
64    ///
65    /// A shorter interval have a performance overhead and should be avoid.
66    pub fn with_update_interval(self, interval: Duration) -> Self {
67        Self {
68            update_interval: interval,
69            ..self
70        }
71    }
72
73    /// The endpoint of remote servers.
74    ///
75    /// By default it's `http://localhost:5778/sampling`.
76    ///
77    /// If service name is provided as part of the endpoint, it will be ignored.
78    pub fn with_endpoint<Str: Into<String>>(self, endpoint: Str) -> Self {
79        Self {
80            endpoint: endpoint.into(),
81            ..self
82        }
83    }
84
85    /// The size of the leaky bucket.
86    ///
87    /// By default the size is 100.
88    ///
89    /// It's used when sampling strategy is rate limiting.
90    pub fn with_leaky_bucket_size(self, size: f64) -> Self {
91        Self {
92            leaky_bucket_size: size,
93            ..self
94        }
95    }
96
97    /// Build a [JaegerRemoteSampler] using provided configuration.
98    ///
99    /// Return errors if:
100    ///
101    /// - the endpoint provided is empty.
102    /// - the service name provided is empty.
103    pub fn build(self) -> Result<Sampler, TraceError> {
104        let endpoint = Self::get_endpoint(&self.endpoint, &self.service_name)
105            .map_err(|err_str| TraceError::Other(err_str.into()))?;
106
107        Ok(Sampler::JaegerRemote(JaegerRemoteSampler::new(
108            self.runtime,
109            self.update_interval,
110            self.client,
111            endpoint,
112            self.default_sampler,
113            self.leaky_bucket_size,
114        )))
115    }
116
117    fn get_endpoint(endpoint: &str, service_name: &str) -> Result<Uri, String> {
118        if endpoint.is_empty() || service_name.is_empty() {
119            return Err("endpoint and service name cannot be empty".to_string());
120        }
121        let mut endpoint = url::Url::parse(endpoint)
122            .unwrap_or_else(|_| url::Url::parse(DEFAULT_REMOTE_SAMPLER_ENDPOINT).unwrap());
123
124        endpoint
125            .query_pairs_mut()
126            .append_pair("service", service_name);
127
128        Uri::from_str(endpoint.as_str()).map_err(|_err| "invalid service name".to_string())
129    }
130}
131
132/// Sampler that fetches the sampling configuration from remotes.
133///
134/// It offers the following sampling strategies:
135/// - **Probabilistic**, fetch a probability between [0.0, 1.0] from remotes and use it to sample traces. If the probability is 0.0, it will never sample traces. If the probability is 1.0, it will always sample traces.
136/// - **Rate limiting**, ses a leaky bucket rate limiter to ensure that traces are sampled with a certain constant rate.
137/// - **Per Operations**, instead of sampling all traces, it samples traces based on the span name. Only probabilistic sampling is supported at the moment.
138///
139/// User can build a [`JaegerRemoteSampler`] by getting a [`JaegerRemoteSamplerBuilder`] from [`Sampler::jaeger_remote`].
140///
141/// Note that the backend doesn't need to be Jaeger so long as it supports jaeger remote sampling
142/// protocol.
143#[derive(Clone, Debug)]
144pub struct JaegerRemoteSampler {
145    inner: Arc<Inner>,
146    default_sampler: Arc<dyn ShouldSample + 'static>,
147}
148
149impl JaegerRemoteSampler {
150    fn new<C, R, S>(
151        runtime: R,
152        update_timeout: Duration,
153        client: C,
154        endpoint: Uri,
155        default_sampler: S,
156        leaky_bucket_size: f64,
157    ) -> Self
158    where
159        R: RuntimeChannel,
160        C: HttpClient + 'static,
161        S: ShouldSample + 'static,
162    {
163        let (shutdown_tx, shutdown_rx) = futures_channel::mpsc::channel(1);
164        let inner = Arc::new(Inner::new(leaky_bucket_size, shutdown_tx));
165        let sampler = JaegerRemoteSampler {
166            inner,
167            default_sampler: Arc::new(default_sampler),
168        };
169        Self::run_update_task(
170            runtime,
171            sampler.inner.clone(),
172            update_timeout,
173            client,
174            shutdown_rx,
175            endpoint,
176        );
177        sampler
178    }
179
180    // start a updating thread/task
181    fn run_update_task<C, R>(
182        runtime: R,
183        strategy: Arc<Inner>,
184        update_timeout: Duration,
185        client: C,
186        shutdown: futures_channel::mpsc::Receiver<()>,
187        endpoint: Uri,
188    ) where
189        R: RuntimeChannel,
190        C: HttpClient + 'static,
191    {
192        // todo: review if we need 'static here
193        let interval = to_interval_stream(runtime.clone(), update_timeout);
194
195        runtime.spawn(async move {
196            // either update or shutdown
197            let mut update = Box::pin(stream::select(
198                shutdown.map(|_| false),
199                interval.map(|_| true),
200            ));
201
202            while let Some(should_update) = update.next().await {
203                if should_update {
204                    // poll next available configuration or shutdown
205                    // send request
206                    match Self::request_new_strategy(&client, endpoint.clone()).await {
207                        Ok(remote_strategy_resp) => strategy.update(remote_strategy_resp),
208                        Err(err_msg) => {
209                            otel_warn!(
210                                name: "JaegerRemoteSampler.FailedToFetchStrategy",
211                                message= "Failed to fetch the sampling strategy from the remote endpoint. The last successfully fetched configuration will be used if available; otherwise, the default sampler will be applied until a successful configuration fetch.",
212                                reason = format!("{}", err_msg),
213                            );
214                        }
215                    };
216                } else {
217                    // shutdown
218                    break;
219                }
220            }
221        });
222    }
223
224    async fn request_new_strategy<C>(
225        client: &C,
226        endpoint: Uri,
227    ) -> Result<SamplingStrategyResponse, String>
228    where
229        C: HttpClient,
230    {
231        let request = http::Request::get(endpoint)
232            .header("Content-Type", "application/json")
233            .body(Default::default())
234            .unwrap();
235
236        let resp = client
237            .send_bytes(request)
238            .await
239            .map_err(|err| format!("the request is failed to send {err}"))?;
240
241        // process failures
242        if resp.status() != http::StatusCode::OK {
243            return Err(format!(
244                "the http response code is not 200 but {}",
245                resp.status()
246            ));
247        }
248
249        // deserialize the response
250        serde_json::from_slice(&resp.body()[..])
251            .map_err(|err| format!("cannot deserialize the response, {err}"))
252    }
253}
254
255impl ShouldSample for JaegerRemoteSampler {
256    fn should_sample(
257        &self,
258        parent_context: Option<&Context>,
259        trace_id: TraceId,
260        name: &str,
261        span_kind: &SpanKind,
262        attributes: &[KeyValue],
263        links: &[Link],
264    ) -> SamplingResult {
265        self.inner
266            .should_sample(parent_context, trace_id, name)
267            .unwrap_or_else(|| {
268                self.default_sampler.should_sample(
269                    parent_context,
270                    trace_id,
271                    name,
272                    span_kind,
273                    attributes,
274                    links,
275                )
276            })
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyType;
283    use std::fmt::{Debug, Formatter};
284
285    impl Debug for SamplingStrategyType {
286        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287            match &self {
288                SamplingStrategyType::Probabilistic => f.write_str("Probabilistic"),
289                SamplingStrategyType::RateLimiting => f.write_str("RateLimiting"),
290            }
291        }
292    }
293
294    #[test]
295    fn deserialize_sampling_strategy_response() {
296        let json = r#"{
297            "strategyType": "PROBABILISTIC",
298            "probabilisticSampling": {
299                "samplingRate": 0.5
300            }
301        }"#;
302        let resp: super::SamplingStrategyResponse = serde_json::from_str(json).unwrap();
303        assert_eq!(resp.strategy_type, SamplingStrategyType::Probabilistic);
304        assert_eq!(resp.probabilistic_sampling.unwrap().sampling_rate, 0.5);
305    }
306}