#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use opentelemetry::{otel_debug, otel_info};
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use opentelemetry::otel_warn;
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use opentelemetry_sdk::runtime::Runtime;
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use std::future::Future;
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use std::hash::{DefaultHasher, Hasher};
use std::time::Duration;
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
use std::time::SystemTime;
#[derive(Debug, Clone, PartialEq)]
pub enum RetryErrorType {
NonRetryable,
Retryable,
Throttled(Duration),
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: usize,
pub initial_delay_ms: u64,
pub max_delay_ms: u64,
pub jitter_ms: u64,
}
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
fn generate_jitter(max_jitter: u64) -> u64 {
let nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos();
let mut hasher = DefaultHasher::default();
hasher.write_u32(nanos);
hasher.finish() % (max_jitter + 1)
}
#[cfg(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
))]
pub async fn retry_with_backoff<R, F, Fut, T, E, C>(
runtime: R,
policy: RetryPolicy,
error_classifier: C,
operation_name: &str,
mut operation: F,
) -> Result<T, E>
where
R: Runtime,
F: FnMut() -> Fut,
E: std::fmt::Debug,
Fut: Future<Output = Result<T, E>>,
C: Fn(&E) -> RetryErrorType,
{
let mut attempt = 0;
let mut delay = policy.initial_delay_ms;
loop {
match operation().await {
Ok(result) => return Ok(result), Err(err) => {
let error_type = error_classifier(&err);
match error_type {
RetryErrorType::NonRetryable => {
otel_warn!(name: "Export.Failed.NonRetryable",
operation = operation_name,
message = "OTLP export failed with non-retryable error - telemetry data will be lost");
return Err(err);
}
RetryErrorType::Retryable if attempt < policy.max_retries => {
attempt += 1;
let jitter = generate_jitter(policy.jitter_ms);
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);
otel_debug!(name: "Export.InProgress.Retrying",
operation = operation_name,
attempt = attempt,
delay_ms = delay_with_jitter,
jitter_ms = jitter,
message = "OTLP export failed with retryable error - retrying"
);
runtime
.delay(Duration::from_millis(delay_with_jitter))
.await;
delay = std::cmp::min(delay * 2, policy.max_delay_ms); }
RetryErrorType::Throttled(server_delay) if attempt < policy.max_retries => {
attempt += 1;
otel_info!(name: "Export.InProgress.Throttled",
operation = operation_name,
attempt = attempt,
delay_ms = server_delay.as_millis(),
message = "OTLP export throttled by OTLP endpoint - delaying and retrying"
);
runtime.delay(server_delay).await;
}
_ => {
otel_warn!(name: "Export.Failed.Exhausted",
operation = operation_name,
retries = attempt,
message = "OTLP export exhausted retries - telemetry data will be lost"
);
return Err(err);
}
}
}
}
}
}
#[cfg(not(any(
feature = "experimental-grpc-retry",
feature = "experimental-http-retry"
)))]
pub async fn retry_with_backoff<R, F, Fut, T, E, C>(
_runtime: R,
_policy: RetryPolicy,
_error_classifier: C,
_operation_name: &str,
mut operation: F,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
operation().await
}
#[cfg(all(test, feature = "experimental-grpc-retry"))]
mod tests {
use super::*;
use opentelemetry_sdk::runtime::Tokio;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_generate_jitter() {
let max_jitter = 100;
let jitter = generate_jitter(max_jitter);
assert!(jitter <= max_jitter);
}
#[tokio::test]
async fn test_retry_with_exponential_backoff_success() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let result = retry_with_backoff(
runtime,
policy,
|_: &()| RetryErrorType::Retryable,
"test_operation",
|| Box::pin(async { Ok::<_, ()>("success") }),
)
.await;
assert_eq!(result, Ok("success"));
}
#[tokio::test]
async fn test_retry_with_exponential_backoff_retries() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let attempts = AtomicUsize::new(0);
let result = retry_with_backoff(
runtime,
policy,
|_: &&str| RetryErrorType::Retryable,
"test_operation",
|| {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 2 {
Err::<&str, &str>("error") } else {
Ok::<&str, &str>("success") }
})
},
)
.await;
assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn test_retry_with_exponential_backoff_failure() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let attempts = AtomicUsize::new(0);
let result = retry_with_backoff(
runtime,
policy,
|_: &&str| RetryErrorType::Retryable,
"test_operation",
|| {
attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Err::<(), _>("error") }) },
)
.await;
assert_eq!(result, Err("error"));
assert_eq!(attempts.load(Ordering::SeqCst), 4); }
#[tokio::test]
async fn test_retry_with_exponential_backoff_timeout() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 12, initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let result = timeout(
Duration::from_secs(1),
retry_with_backoff(
runtime,
policy,
|_: &&str| RetryErrorType::Retryable,
"test_operation",
|| {
Box::pin(async { Err::<(), _>("error") }) },
),
)
.await;
assert!(result.is_err()); }
#[test]
fn test_retry_error_type_equality() {
assert_eq!(RetryErrorType::NonRetryable, RetryErrorType::NonRetryable);
assert_eq!(RetryErrorType::Retryable, RetryErrorType::Retryable);
assert_eq!(
RetryErrorType::Throttled(Duration::from_secs(30)),
RetryErrorType::Throttled(Duration::from_secs(30))
);
assert_ne!(RetryErrorType::Retryable, RetryErrorType::NonRetryable);
}
#[tokio::test]
async fn test_retry_with_throttling_non_retryable_error() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let attempts = AtomicUsize::new(0);
let classifier = |_: &()| RetryErrorType::NonRetryable;
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Err::<(), _>(()) }) })
.await;
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_retry_with_throttling_retryable_error() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 2,
initial_delay_ms: 10, max_delay_ms: 100,
jitter_ms: 5,
};
let attempts = AtomicUsize::new(0);
let classifier = |_: &()| RetryErrorType::Retryable;
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 1 {
Err::<&str, ()>(()) } else {
Ok("success") }
})
})
.await;
assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 2); }
#[tokio::test]
async fn test_retry_with_throttling_throttled_error() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 2,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};
let attempts = AtomicUsize::new(0);
let classifier = |_: &()| RetryErrorType::Throttled(Duration::from_millis(10));
let start_time = std::time::Instant::now();
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 1 {
Err::<&str, ()>(()) } else {
Ok("success") }
})
})
.await;
let elapsed = start_time.elapsed();
assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 2); assert!(elapsed >= Duration::from_millis(10)); }
#[tokio::test]
async fn test_retry_with_throttling_max_attempts_exceeded() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 1, initial_delay_ms: 10,
max_delay_ms: 100,
jitter_ms: 5,
};
let attempts = AtomicUsize::new(0);
let classifier = |_: &()| RetryErrorType::Retryable;
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Err::<(), _>(()) }) })
.await;
assert!(result.is_err());
assert_eq!(attempts.load(Ordering::SeqCst), 2); }
#[tokio::test]
async fn test_retry_with_throttling_mixed_error_types() {
let runtime = Tokio;
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 10,
max_delay_ms: 100,
jitter_ms: 5,
};
let attempts = AtomicUsize::new(0);
let classifier = |err: &usize| match *err {
0 => RetryErrorType::Retryable,
1 => RetryErrorType::Throttled(Duration::from_millis(10)),
_ => RetryErrorType::Retryable,
};
let result = retry_with_backoff(runtime, policy, classifier, "test_operation", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 2 {
Err(attempt) } else {
Ok("success") }
})
})
.await;
assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 3); }
}