use std::collections::BTreeMap;
use kevy_map::KevyMap;
use super::{EntryBatch, StreamData, StreamId};
pub(super) use super::claim::AutoclaimResult;
use crate::value::SmallBytes;
use crate::StoreError;
pub struct ConsumerGroup {
pub last_delivered_id: StreamId,
pub pel: BTreeMap<StreamId, PelEntry>,
pub consumers: KevyMap<SmallBytes, Box<ConsumerState>>,
}
impl ConsumerGroup {
pub fn last_delivered_id(&self) -> StreamId {
self.last_delivered_id
}
pub fn pending_count(&self) -> usize {
self.pel.len()
}
pub fn consumer_count(&self) -> usize {
self.consumers.len()
}
pub fn consumers_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerState)> {
self.consumers.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
}
}
impl ConsumerState {
pub fn pending_count(&self) -> usize {
self.pel_count
}
pub fn last_seen_ms(&self) -> u64 {
self.last_seen_ms
}
}
impl Default for ConsumerGroup {
fn default() -> Self {
Self {
last_delivered_id: StreamId::MIN,
pel: BTreeMap::new(),
consumers: KevyMap::default(),
}
}
}
#[derive(Clone, Debug)]
pub struct PelEntry {
pub consumer: SmallBytes,
pub delivery_time_ms: u64,
pub delivery_count: u32,
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct ConsumerState {
pub name: SmallBytes,
pub last_seen_ms: u64,
pub pel_count: usize,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum GroupCreateMode {
AtId(StreamId),
AtCurrent,
}
pub struct PendingSummary {
pub total: u64,
pub id_range: Option<(StreamId, StreamId)>,
pub by_consumer: Vec<(Vec<u8>, u64)>,
}
pub struct PendingExtended {
pub rows: Vec<PendingExtendedRow>,
}
pub struct PendingExtendedRow {
pub id: StreamId,
pub consumer: Vec<u8>,
pub idle_ms: u64,
pub delivery_count: u32,
}
pub struct XClaimOpts {
pub min_idle_ms: u64,
pub idle_override_ms: Option<u64>,
pub time_override_ms: Option<u64>,
pub retrycount_override: Option<u32>,
pub force: bool,
pub justid: bool,
}
impl StreamData {
pub fn group_create(
&mut self,
name: &[u8],
mode: GroupCreateMode,
) -> Result<bool, StoreError> {
if self.groups.contains_key(name) {
return Ok(false);
}
let last_delivered_id = match mode {
GroupCreateMode::AtId(id) => id,
GroupCreateMode::AtCurrent => self.last_id,
};
self.groups.insert(
SmallBytes::from_slice(name),
Box::new(ConsumerGroup {
last_delivered_id,
pel: BTreeMap::new(),
consumers: KevyMap::default(),
}),
);
Ok(true)
}
pub fn group_destroy(&mut self, name: &[u8]) -> bool {
self.groups.remove(name).is_some()
}
pub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool {
let Some(g) = self.groups.get_mut(name) else {
return false;
};
g.last_delivered_id = match mode {
GroupCreateMode::AtId(id) => id,
GroupCreateMode::AtCurrent => self.last_id,
};
true
}
pub fn group_create_consumer(&mut self, group: &[u8], consumer: &[u8], now_ms: u64) -> bool {
let Some(g) = self.groups.get_mut(group) else {
return false;
};
if g.consumers.contains_key(consumer) {
return false;
}
g.consumers.insert(
SmallBytes::from_slice(consumer),
Box::new(ConsumerState {
name: SmallBytes::from_slice(consumer),
last_seen_ms: now_ms,
pel_count: 0,
}),
);
true
}
pub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64 {
let Some(g) = self.groups.get_mut(group) else {
return 0;
};
let dropped = g.pel.len();
g.pel.retain(|_, p| p.consumer.as_slice() != consumer);
let dropped = dropped - g.pel.len();
g.consumers.remove(consumer);
dropped as u64
}
pub fn readgroup(
&mut self,
group: &[u8],
consumer: &[u8],
last_seen_arg: ReadGroupId,
count: Option<usize>,
noack: bool,
now_ms: u64,
) -> Result<EntryBatch, StoreError> {
let Some(g) = self.groups.get_mut(group) else {
return Err(StoreError::NoSuchKey);
};
let consumer_smb = SmallBytes::from_slice(consumer);
ensure_consumer(g, &consumer_smb, now_ms);
if let Some(cs) = g.consumers.get_mut(consumer_smb.as_slice()) {
cs.last_seen_ms = now_ms;
}
match last_seen_arg {
ReadGroupId::New => {
let start = g.last_delivered_id.next();
let entries: Vec<(StreamId, &[(SmallBytes, SmallBytes)])> = self
.entries
.range(start..=StreamId::MAX)
.map(|(id, fv)| (*id, fv.as_slice()))
.collect();
let take = match count {
Some(n) => entries.into_iter().take(n).collect::<Vec<_>>(),
None => entries,
};
if take.is_empty() {
return Ok(Vec::new());
}
if !noack {
record_deliveries(g, &consumer_smb, &take, now_ms);
}
let g_mut = self.groups.get_mut(group).expect("present");
if let Some((last_id, _)) = take.last() {
g_mut.last_delivered_id = *last_id;
}
Ok(super::clone_entries(take))
}
ReadGroupId::ReplayAfter(after) => {
let mut hit: Vec<(StreamId, Vec<(SmallBytes, SmallBytes)>)> = Vec::new();
let consumer_match = consumer_smb.clone();
for (id, pel_entry) in g.pel.range(after.next()..=StreamId::MAX) {
if pel_entry.consumer != consumer_match {
continue;
}
if let Some(fv) = self.entries.get(id) {
hit.push((*id, fv.clone()));
}
if let Some(n) = count
&& hit.len() >= n
{
break;
}
}
Ok(hit
.into_iter()
.map(|(id, fv)| {
(
id,
fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
)
})
.collect())
}
}
}
pub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64 {
let Some(g) = self.groups.get_mut(group) else {
return 0;
};
let mut n = 0u64;
for id in ids {
if let Some(p) = g.pel.remove(id) {
if let Some(cs) = g.consumers.get_mut(p.consumer.as_slice()) {
cs.pel_count = cs.pel_count.saturating_sub(1);
}
n += 1;
}
}
n
}
pub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary> {
let g = self.groups.get(group)?;
let total = g.pel.len() as u64;
let id_range = match (g.pel.keys().next(), g.pel.keys().next_back()) {
(Some(lo), Some(hi)) => Some((*lo, *hi)),
_ => None,
};
let mut counts: Vec<(Vec<u8>, u64)> = Vec::new();
for p in g.pel.values() {
if let Some((_, n)) = counts.iter_mut().find(|(name, _)| name == p.consumer.as_slice()) {
*n += 1;
} else {
counts.push((p.consumer.to_vec(), 1));
}
}
Some(PendingSummary { total, id_range, by_consumer: counts })
}
#[allow(clippy::too_many_arguments)]
pub fn pending_extended(
&self,
group: &[u8],
idle_min_ms: Option<u64>,
start: StreamId,
end: StreamId,
count: usize,
consumer_filter: Option<&[u8]>,
now_ms: u64,
) -> Option<PendingExtended> {
let g = self.groups.get(group)?;
let mut rows = Vec::with_capacity(count.min(g.pel.len()));
for (id, p) in g.pel.range(start..=end) {
if rows.len() >= count {
break;
}
let idle = now_ms.saturating_sub(p.delivery_time_ms);
if let Some(min) = idle_min_ms
&& idle < min
{
continue;
}
if let Some(c) = consumer_filter
&& p.consumer.as_slice() != c
{
continue;
}
rows.push(PendingExtendedRow {
id: *id,
consumer: p.consumer.to_vec(),
idle_ms: idle,
delivery_count: p.delivery_count,
});
}
Some(PendingExtended { rows })
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ReadGroupId {
New,
ReplayAfter(StreamId),
}
pub(super) fn ensure_consumer(g: &mut ConsumerGroup, name: &SmallBytes, now_ms: u64) {
if g.consumers.get(name.as_slice()).is_none() {
g.consumers.insert(
name.clone(),
Box::new(ConsumerState {
name: name.clone(),
last_seen_ms: now_ms,
pel_count: 0,
}),
);
}
}
fn record_deliveries(
g: &mut ConsumerGroup,
consumer: &SmallBytes,
entries: &[(StreamId, &[(SmallBytes, SmallBytes)])],
now_ms: u64,
) {
let mut new_for_consumer = 0usize;
for (id, _) in entries {
let entry = g.pel.entry(*id).or_insert_with(|| {
new_for_consumer += 1;
PelEntry {
consumer: consumer.clone(),
delivery_time_ms: now_ms,
delivery_count: 0,
}
});
if entry.consumer != *consumer {
if let Some(prev) = g.consumers.get_mut(entry.consumer.as_slice()) {
prev.pel_count = prev.pel_count.saturating_sub(1);
}
entry.consumer = consumer.clone();
new_for_consumer += 1;
}
entry.delivery_time_ms = now_ms;
entry.delivery_count = entry.delivery_count.saturating_add(1);
}
if let Some(cs) = g.consumers.get_mut(consumer.as_slice()) {
cs.pel_count = cs.pel_count.saturating_add(new_for_consumer);
}
}