use std::{
future::Future,
time::{Duration, Instant},
};
use crate::{
cache_redis::{
RedisCacheError, RedisCacheResult, RedisClusterRedirectKind, RedisCommandEvent,
RedisCommandEventKind, RedisCommandOutcome, RedisDegradationEvent,
},
resil::BreakerError,
};
#[derive(Debug, Clone, Default)]
pub struct RedisCommandRecorder {
#[cfg(feature = "observability")]
metrics: Option<crate::observability::MetricsRegistry>,
shard: String,
}
impl RedisCommandRecorder {
pub fn new() -> Self {
Self {
#[cfg(feature = "observability")]
metrics: None,
shard: "default".to_string(),
}
}
pub fn with_shard(mut self, shard: impl Into<String>) -> Self {
self.shard = shard.into();
self
}
#[cfg(feature = "observability")]
pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
self.metrics = Some(metrics);
self
}
pub fn record_command(
&self,
operation: &str,
outcome: RedisCommandOutcome,
duration: Duration,
) {
#[cfg(feature = "observability")]
crate::observability::record_redis_command(
self.metrics.as_ref(),
operation,
&self.shard,
outcome.as_str(),
duration,
);
#[cfg(not(feature = "observability"))]
let _ = (operation, outcome, duration);
}
pub fn record_event(&self, event: RedisCommandEvent) {
#[cfg(feature = "observability")]
crate::observability::record_redis_event(
self.metrics.as_ref(),
event.event.as_str(),
&self.shard,
event.outcome.as_str(),
);
#[cfg(not(feature = "observability"))]
let _ = event;
}
pub fn record_redirect(&self, kind: RedisClusterRedirectKind) {
self.record_event(RedisCommandEvent::redirect(kind));
}
pub fn record_degradation(&self, event: RedisDegradationEvent) {
#[cfg(feature = "observability")]
crate::observability::record_redis_degradation(
self.metrics.as_ref(),
event.operation.as_str(),
event.action.as_str(),
&self.shard,
);
#[cfg(not(feature = "observability"))]
let _ = event;
self.record_event(RedisCommandEvent::new(
RedisCommandEventKind::Degradation,
RedisCommandOutcome::Degraded,
));
}
pub fn record_breaker(&self, outcome: RedisCommandOutcome) {
self.record_event(RedisCommandEvent::new(
RedisCommandEventKind::Breaker,
outcome,
));
}
}
pub async fn run_redis_command<T, F>(
recorder: &RedisCommandRecorder,
operation: &'static str,
command_timeout: Duration,
future: F,
) -> RedisCacheResult<T>
where
F: Future<Output = redis::RedisResult<T>>,
{
run_redis_cache_command(recorder, operation, command_timeout, async {
future.await.map_err(redis_error)
})
.await
}
pub(crate) async fn run_redis_cache_command<T, F>(
recorder: &RedisCommandRecorder,
operation: &'static str,
command_timeout: Duration,
future: F,
) -> RedisCacheResult<T>
where
F: Future<Output = RedisCacheResult<T>>,
{
let started = Instant::now();
let result = match tokio::time::timeout(command_timeout, future).await {
Ok(result) => result,
Err(_) => Err(RedisCacheError::Timeout(operation.to_string())),
};
let outcome = match &result {
Ok(_) => RedisCommandOutcome::Success,
Err(error) => crate::cache_redis::classify_redis_error(error),
};
recorder.record_command(operation, outcome, started.elapsed());
if let RedisCommandOutcome::Redirect(kind) = outcome {
recorder.record_redirect(kind);
}
result
}
pub(crate) fn redis_breaker_rejection(
operation: &'static str,
error: BreakerError,
) -> RedisCacheError {
RedisCacheError::BreakerOpen(format!("{operation}: {error}"))
}
pub(crate) fn redis_error(error: redis::RedisError) -> RedisCacheError {
RedisCacheError::Backend(error.to_string())
}