opentelemetry_sdk/trace/sampler/jaeger_remote/
sampler.rs1use crate::runtime::{to_interval_stream, RuntimeChannel};
2use crate::trace::error::TraceError;
3use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
4use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
5use crate::trace::{Sampler, ShouldSample};
6use futures_util::{stream, StreamExt as _};
7use http::Uri;
8use opentelemetry::trace::{Link, SamplingResult, SpanKind, TraceId};
9use opentelemetry::{otel_warn, Context, KeyValue};
10use opentelemetry_http::HttpClient;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14
15const DEFAULT_REMOTE_SAMPLER_ENDPOINT: &str = "http://localhost:5778/sampling";
16
17#[derive(Debug)]
20pub struct JaegerRemoteSamplerBuilder<C, S, R>
21where
22 R: RuntimeChannel,
23 C: HttpClient + 'static,
24 S: ShouldSample + 'static,
25{
26 pub(crate) update_interval: Duration,
27 pub(crate) client: C,
28 pub(crate) endpoint: String,
29 pub(crate) default_sampler: S,
30 pub(crate) leaky_bucket_size: f64,
31 pub(crate) runtime: R,
32 pub(crate) service_name: String,
33}
34
35impl<C, S, R> JaegerRemoteSamplerBuilder<C, S, R>
36where
37 C: HttpClient + 'static,
38 S: ShouldSample + 'static,
39 R: RuntimeChannel,
40{
41 pub(crate) fn new<Svc>(
42 runtime: R,
43 http_client: C,
44 default_sampler: S,
45 service_name: Svc,
46 ) -> Self
47 where
48 Svc: Into<String>,
49 {
50 JaegerRemoteSamplerBuilder {
51 runtime,
52 update_interval: Duration::from_secs(60 * 5),
53 client: http_client,
54 endpoint: DEFAULT_REMOTE_SAMPLER_ENDPOINT.to_string(),
55 default_sampler,
56 leaky_bucket_size: 100.0,
57 service_name: service_name.into(),
58 }
59 }
60
61 pub fn with_update_interval(self, interval: Duration) -> Self {
67 Self {
68 update_interval: interval,
69 ..self
70 }
71 }
72
73 pub fn with_endpoint<Str: Into<String>>(self, endpoint: Str) -> Self {
79 Self {
80 endpoint: endpoint.into(),
81 ..self
82 }
83 }
84
85 pub fn with_leaky_bucket_size(self, size: f64) -> Self {
91 Self {
92 leaky_bucket_size: size,
93 ..self
94 }
95 }
96
97 pub fn build(self) -> Result<Sampler, TraceError> {
104 let endpoint = Self::get_endpoint(&self.endpoint, &self.service_name)
105 .map_err(|err_str| TraceError::Other(err_str.into()))?;
106
107 Ok(Sampler::JaegerRemote(JaegerRemoteSampler::new(
108 self.runtime,
109 self.update_interval,
110 self.client,
111 endpoint,
112 self.default_sampler,
113 self.leaky_bucket_size,
114 )))
115 }
116
117 fn get_endpoint(endpoint: &str, service_name: &str) -> Result<Uri, String> {
118 if endpoint.is_empty() || service_name.is_empty() {
119 return Err("endpoint and service name cannot be empty".to_string());
120 }
121 let mut endpoint = url::Url::parse(endpoint)
122 .unwrap_or_else(|_| url::Url::parse(DEFAULT_REMOTE_SAMPLER_ENDPOINT).unwrap());
123
124 endpoint
125 .query_pairs_mut()
126 .append_pair("service", service_name);
127
128 Uri::from_str(endpoint.as_str()).map_err(|_err| "invalid service name".to_string())
129 }
130}
131
132#[derive(Clone, Debug)]
144pub struct JaegerRemoteSampler {
145 inner: Arc<Inner>,
146 default_sampler: Arc<dyn ShouldSample + 'static>,
147}
148
149impl JaegerRemoteSampler {
150 fn new<C, R, S>(
151 runtime: R,
152 update_timeout: Duration,
153 client: C,
154 endpoint: Uri,
155 default_sampler: S,
156 leaky_bucket_size: f64,
157 ) -> Self
158 where
159 R: RuntimeChannel,
160 C: HttpClient + 'static,
161 S: ShouldSample + 'static,
162 {
163 let (shutdown_tx, shutdown_rx) = futures_channel::mpsc::channel(1);
164 let inner = Arc::new(Inner::new(leaky_bucket_size, shutdown_tx));
165 let sampler = JaegerRemoteSampler {
166 inner,
167 default_sampler: Arc::new(default_sampler),
168 };
169 Self::run_update_task(
170 runtime,
171 sampler.inner.clone(),
172 update_timeout,
173 client,
174 shutdown_rx,
175 endpoint,
176 );
177 sampler
178 }
179
180 fn run_update_task<C, R>(
182 runtime: R,
183 strategy: Arc<Inner>,
184 update_timeout: Duration,
185 client: C,
186 shutdown: futures_channel::mpsc::Receiver<()>,
187 endpoint: Uri,
188 ) where
189 R: RuntimeChannel,
190 C: HttpClient + 'static,
191 {
192 let interval = to_interval_stream(runtime.clone(), update_timeout);
194
195 runtime.spawn(async move {
196 let mut update = Box::pin(stream::select(
198 shutdown.map(|_| false),
199 interval.map(|_| true),
200 ));
201
202 while let Some(should_update) = update.next().await {
203 if should_update {
204 match Self::request_new_strategy(&client, endpoint.clone()).await {
207 Ok(remote_strategy_resp) => strategy.update(remote_strategy_resp),
208 Err(err_msg) => {
209 otel_warn!(
210 name: "JaegerRemoteSampler.FailedToFetchStrategy",
211 message= "Failed to fetch the sampling strategy from the remote endpoint. The last successfully fetched configuration will be used if available; otherwise, the default sampler will be applied until a successful configuration fetch.",
212 reason = format!("{}", err_msg),
213 );
214 }
215 };
216 } else {
217 break;
219 }
220 }
221 });
222 }
223
224 async fn request_new_strategy<C>(
225 client: &C,
226 endpoint: Uri,
227 ) -> Result<SamplingStrategyResponse, String>
228 where
229 C: HttpClient,
230 {
231 let request = http::Request::get(endpoint)
232 .header("Content-Type", "application/json")
233 .body(Default::default())
234 .unwrap();
235
236 let resp = client
237 .send_bytes(request)
238 .await
239 .map_err(|err| format!("the request is failed to send {err}"))?;
240
241 if resp.status() != http::StatusCode::OK {
243 return Err(format!(
244 "the http response code is not 200 but {}",
245 resp.status()
246 ));
247 }
248
249 serde_json::from_slice(&resp.body()[..])
251 .map_err(|err| format!("cannot deserialize the response, {err}"))
252 }
253}
254
255impl ShouldSample for JaegerRemoteSampler {
256 fn should_sample(
257 &self,
258 parent_context: Option<&Context>,
259 trace_id: TraceId,
260 name: &str,
261 span_kind: &SpanKind,
262 attributes: &[KeyValue],
263 links: &[Link],
264 ) -> SamplingResult {
265 self.inner
266 .should_sample(parent_context, trace_id, name)
267 .unwrap_or_else(|| {
268 self.default_sampler.should_sample(
269 parent_context,
270 trace_id,
271 name,
272 span_kind,
273 attributes,
274 links,
275 )
276 })
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyType;
283 use std::fmt::{Debug, Formatter};
284
285 impl Debug for SamplingStrategyType {
286 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287 match &self {
288 SamplingStrategyType::Probabilistic => f.write_str("Probabilistic"),
289 SamplingStrategyType::RateLimiting => f.write_str("RateLimiting"),
290 }
291 }
292 }
293
294 #[test]
295 fn deserialize_sampling_strategy_response() {
296 let json = r#"{
297 "strategyType": "PROBABILISTIC",
298 "probabilisticSampling": {
299 "samplingRate": 0.5
300 }
301 }"#;
302 let resp: super::SamplingStrategyResponse = serde_json::from_str(json).unwrap();
303 assert_eq!(resp.strategy_type, SamplingStrategyType::Probabilistic);
304 assert_eq!(resp.probabilistic_sampling.unwrap().sampling_rate, 0.5);
305 }
306}