use redis::streams::{StreamInfoGroupsReply, StreamPendingReply};
use redis::{Commands, Connection, RedisResult};
#[derive(Debug, Clone)]
pub struct StreamProgress {
total_entries: u64,
pending_entries: u64,
undelivered_entries: u64,
}
impl StreamProgress {
pub fn remaining_entries(&self) -> u64 {
self.pending_entries + self.undelivered_entries
}
pub fn progress_percentage(&self) -> f64 {
if self.total_entries == 0 {
return 100.0;
}
let processed = self.total_entries - self.remaining_entries();
(processed as f64 / self.total_entries as f64) * 100.0
}
}
pub fn get_consumer_group_progress(
con: &mut Connection,
stream_key: &str,
group_name: &str,
) -> RedisResult<StreamProgress> {
let total_entries: u64 = con.xlen(stream_key)?;
let pending_info: StreamPendingReply = con.xpending(stream_key, group_name)?;
let pending_entries = match &pending_info {
StreamPendingReply::Data(data) => data.count,
StreamPendingReply::Empty => 0,
};
let groups_info: StreamInfoGroupsReply = con.xinfo_groups(stream_key)?;
let mut last_delivered_id = "0-0".to_string();
let mut undelivered_entries: u64 = 0;
for group in &groups_info.groups {
if group.name == group_name {
last_delivered_id = group.last_delivered_id.clone();
undelivered_entries = group.lag.unwrap_or(0) as u64; break;
}
}
Ok(StreamProgress {
total_entries,
pending_entries: pending_entries as u64,
undelivered_entries,
})
}