use crate::trace::sampler::jaeger_remote::remote::{
PerOperationSamplingStrategies, ProbabilisticSamplingStrategy, RateLimitingSamplingStrategy,
SamplingStrategyResponse,
};
use crate::trace::sampler::sample_based_on_probability;
use crate::trace::{SamplingDecision, SamplingResult};
use opentelemetry::trace::{TraceContextExt, TraceId, TraceState};
use opentelemetry::{otel_warn, Context};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Mutex;
use super::rate_limit::LeakyBucket;
enum Strategy {
Probabilistic(f64),
RateLimiting(LeakyBucket),
PerOperation(PerOperationStrategies),
}
pub(crate) struct Inner {
strategy: Mutex<Option<Strategy>>,
leaky_bucket_size: f64,
shut_down: futures_channel::mpsc::Sender<()>,
}
impl Debug for Inner {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JaegerRemoteSamplerInner")
.field("leaky_bucket_size", &self.leaky_bucket_size)
.finish()
}
}
impl Drop for Inner {
fn drop(&mut self) {
let _ = self.shut_down.try_send(());
}
}
impl Inner {
pub(crate) fn new(
leaky_bucket_size: f64,
shut_down: futures_channel::mpsc::Sender<()>,
) -> Self {
Inner {
strategy: Mutex::new(None),
leaky_bucket_size,
shut_down,
}
}
pub(crate) fn update(&self, remote_strategy_resp: SamplingStrategyResponse) {
self.strategy
.lock()
.map(|mut old_strategy_opt| {
*old_strategy_opt = match old_strategy_opt.take() {
Some(mut old_strategy) => {
match (
remote_strategy_resp.operation_sampling,
remote_strategy_resp.rate_limiting_sampling,
remote_strategy_resp.probabilistic_sampling,
&mut old_strategy,
) {
(
None,
Some(rate_limiting),
None,
Strategy::RateLimiting(leaky_bucket),
) => {
leaky_bucket.update(rate_limiting.max_traces_per_second as f64);
Some(old_strategy)
}
(
operation_sampling,
rate_limiting_sampling,
probabilistic_sampling,
_,
) => self.init_strategy(
operation_sampling,
rate_limiting_sampling,
probabilistic_sampling,
),
}
}
None => self.init_strategy(
remote_strategy_resp.operation_sampling,
remote_strategy_resp.rate_limiting_sampling,
remote_strategy_resp.probabilistic_sampling,
),
}
})
.unwrap_or_else(|_err| {
otel_warn!(
name: "JaegerRemoteSampler.MutexPoisoned",
message = "Unable to update Jaeger Remote sampling strategy: the sampler's internal mutex is poisoned, likely due to a panic in another thread holding the lock. No further attempts to update the strategy will be made until the application or process restarts, and the last known configuration will continue to be used.",
);
});
}
fn init_strategy(
&self,
operation_sampling: Option<PerOperationSamplingStrategies>,
rate_limiting_sampling: Option<RateLimitingSamplingStrategy>,
probabilistic_sampling: Option<ProbabilisticSamplingStrategy>,
) -> Option<Strategy> {
match (
operation_sampling,
rate_limiting_sampling,
probabilistic_sampling,
) {
(Some(op_sampling), _, _) => {
let mut per_ops_sampling = PerOperationStrategies::default();
per_ops_sampling.update(op_sampling);
Some(Strategy::PerOperation(per_ops_sampling))
}
(_, Some(rate_limiting), _) => Some(Strategy::RateLimiting(LeakyBucket::new(
self.leaky_bucket_size,
rate_limiting.max_traces_per_second as f64,
))),
(_, _, Some(probabilistic)) => {
Some(Strategy::Probabilistic(probabilistic.sampling_rate))
}
_ => {
otel_warn!(
name: "JaegerRemoteSampler.InvalidStrategyReceived",
message = "Invalid sampling strategy received from the remote endpoint. Expected one of: OperationSampling, RateLimitingSampling, or ProbabilisticSampling. Continuing to use the previous strategy or default sampler until a successful update.",
);
None
}
}
}
pub(crate) fn should_sample(
&self,
parent_context: Option<&Context>,
trace_id: TraceId,
name: &str,
) -> Option<SamplingResult> {
self.strategy
.lock()
.map(|mut inner_opt| match inner_opt.as_mut() {
Some(inner) => {
let decision = match inner {
Strategy::RateLimiting(leaky_bucket) => {
if leaky_bucket.should_sample() {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::Drop
}
}
Strategy::Probabilistic(prob) => {
sample_based_on_probability(prob, trace_id)
}
Strategy::PerOperation(per_operation_strategies) => {
sample_based_on_probability(
&per_operation_strategies.get_probability(name),
trace_id,
)
}
};
Some(SamplingResult {
decision,
attributes: Vec::new(),
trace_state: match parent_context {
Some(ctx) => ctx.span().span_context().trace_state().clone(),
None => TraceState::default(),
},
})
}
None => None,
})
.unwrap_or_else(|_| None)
}
}
#[derive(Default)]
pub(crate) struct PerOperationStrategies {
default_prob: f64,
default_lower_bound_traces_per_second: f64,
operation_prob: HashMap<String, f64>,
default_upper_bound_traces_per_second: f64,
}
impl PerOperationStrategies {
pub(crate) fn update(&mut self, remote_strategies: PerOperationSamplingStrategies) {
self.default_prob = remote_strategies.default_sampling_probability;
self.default_lower_bound_traces_per_second =
remote_strategies.default_lower_bound_traces_per_second;
self.default_upper_bound_traces_per_second =
remote_strategies.default_upper_bound_traces_per_second;
self.operation_prob = remote_strategies
.per_operation_strategies
.into_iter()
.map(|op_strategy| {
(
op_strategy.operation,
op_strategy.probabilistic_sampling.sampling_rate,
)
})
.collect();
}
pub(crate) fn get_probability(&self, operation: &str) -> f64 {
*self
.operation_prob
.get(operation)
.unwrap_or(&self.default_prob)
}
}
#[cfg(test)]
mod tests {}