use crate::autoscale_metrics::AutoscaleMetrics;
use crate::backend::{AutoscalerBackendImpl, QueueStatsProviderImpl};
use crate::error::{Result, ShoveError};
use super::client::RedisClient;
#[derive(Debug, Clone, Default)]
pub struct RedisQueueStats {
pub messages_ready: u64,
pub messages_in_flight: u64,
}
#[derive(Clone)]
pub struct RedisQueueStatsProvider {
client: RedisClient,
}
impl RedisQueueStatsProvider {
pub fn new(client: RedisClient) -> Self {
Self { client }
}
pub async fn get_queue_stats(&self, queue: &str) -> Result<RedisQueueStats> {
let group = self.client.group().to_owned();
let client = self.client.clone();
let client2 = self.client.clone();
let group2 = group.clone();
let queue2 = queue.to_owned();
let (stream_len, pending_reply) = tokio::try_join!(
async move {
let mut conn = client.multiplexed_conn().await?;
conn.query::<u64>(redis::cmd("XLEN").arg(queue))
.await
.map_err(|e| ShoveError::Connection(format!("XLEN failed: {e}")))
},
async move {
let mut conn = client2.multiplexed_conn().await?;
conn.query::<redis::Value>(redis::cmd("XPENDING").arg(&queue2).arg(&group2))
.await
.map_err(|e| ShoveError::Connection(format!("XPENDING failed: {e}")))
}
)?;
let in_flight: u64 = match &pending_reply {
redis::Value::Array(parts) => {
if let Some(redis::Value::Int(n)) = parts.first() {
*n as u64
} else {
0
}
}
_ => 0,
};
let messages_ready = stream_len.saturating_sub(in_flight);
Ok(RedisQueueStats {
messages_ready,
messages_in_flight: in_flight,
})
}
}
#[derive(Clone)]
pub struct RedisAutoscalerBackend {
_client: RedisClient,
}
impl RedisAutoscalerBackend {
pub fn new(client: RedisClient) -> Self {
Self { _client: client }
}
}
impl AutoscalerBackendImpl for RedisAutoscalerBackend {}
impl QueueStatsProviderImpl for RedisQueueStatsProvider {
async fn snapshot(&self, queue: &str) -> Result<AutoscaleMetrics> {
let stats = self.get_queue_stats(queue).await?;
Ok(AutoscaleMetrics {
backlog: Some(stats.messages_ready),
inflight: Some(stats.messages_in_flight),
throughput_per_sec: None,
processing_latency: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn queue_stats_populates_messages_ready() {
let stats = RedisQueueStats {
messages_ready: 42,
messages_in_flight: 3,
};
assert_eq!(stats.messages_ready, 42);
assert_eq!(stats.messages_in_flight, 3);
}
#[test]
fn queue_stats_default_is_zero() {
let stats = RedisQueueStats::default();
assert_eq!(stats.messages_ready, 0);
assert_eq!(stats.messages_in_flight, 0);
}
#[test]
fn saturating_sub_prevents_underflow() {
let in_flight: u64 = 10;
let stream_len: u64 = 5;
let ready = stream_len.saturating_sub(in_flight);
assert_eq!(ready, 0);
}
fn parse_xpending_in_flight(reply: &redis::Value) -> u64 {
match reply {
redis::Value::Array(parts) => {
if let Some(redis::Value::Int(n)) = parts.first() {
*n as u64
} else {
0
}
}
_ => 0,
}
}
#[test]
fn xpending_reply_extracts_in_flight_count() {
let reply = redis::Value::Array(vec![
redis::Value::Int(7),
redis::Value::BulkString(b"1-0".to_vec()),
redis::Value::BulkString(b"99-0".to_vec()),
]);
assert_eq!(parse_xpending_in_flight(&reply), 7);
}
#[test]
fn xpending_empty_array_returns_zero() {
let reply = redis::Value::Array(vec![]);
assert_eq!(parse_xpending_in_flight(&reply), 0);
}
#[test]
fn xpending_non_int_first_element_returns_zero() {
let reply = redis::Value::Array(vec![redis::Value::BulkString(b"unexpected".to_vec())]);
assert_eq!(parse_xpending_in_flight(&reply), 0);
}
#[test]
fn xpending_nil_reply_returns_zero() {
assert_eq!(parse_xpending_in_flight(&redis::Value::Nil), 0);
}
#[test]
fn messages_ready_is_stream_len_minus_in_flight() {
let stream_len: u64 = 20;
let in_flight: u64 = 5;
let ready = stream_len.saturating_sub(in_flight);
assert_eq!(ready, 15);
}
}