use super::group::{AutoclaimResult, ReadGroupId};
use super::{
GroupCreateMode, PendingExtended, PendingSummary, StreamData, StreamId, XAddIdSpec, XClaimOpts,
};
use crate::value::*;
use crate::{Entry, Store, StoreError};
pub type EntryBatch = Vec<(StreamId, Vec<(Vec<u8>, Vec<u8>)>)>;
impl Store {
fn stream_mut(
&mut self,
key: &[u8],
create: bool,
) -> Result<Option<&mut StreamData>, StoreError> {
if self.live_entry_mut(key).is_none() {
if !create {
return Ok(None);
}
self.insert_entry(
SmallBytes::from_slice(key),
Entry::new(Value::Stream(Box::default()), None),
);
}
match &mut self.map.get_mut(key).expect("present").value {
Value::Stream(s) => Ok(Some(s)),
_ => Err(StoreError::WrongType),
}
}
fn stream_ref(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
match self.live_entry(key) {
None => Ok(None),
Some(e) => match &e.value {
Value::Stream(s) => Ok(Some(s.as_ref())),
_ => Err(StoreError::WrongType),
},
}
}
pub fn stream_view(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
self.stream_ref(key)
}
pub fn xadd(
&mut self,
key: &[u8],
spec: XAddIdSpec,
fields: Vec<(Vec<u8>, Vec<u8>)>,
nomkstream: bool,
now_ms: u64,
) -> Result<Option<StreamId>, StoreError> {
if nomkstream && self.live_entry(key).is_none() {
return Ok(None);
}
let id;
let weight_delta;
{
let s = self.stream_mut(key, true)?.expect("created");
id = s.resolve_xadd_id(spec, now_ms)?;
let smb_fields: Vec<(SmallBytes, SmallBytes)> = fields
.into_iter()
.map(|(f, v)| (SmallBytes::from_slice(&f), SmallBytes::from_slice(&v)))
.collect();
weight_delta = super::stream_entry_weight(&smb_fields);
s.insert(id, smb_fields);
}
self.bump_if_watched(key);
self.account_delta(key, weight_delta as i64);
Ok(Some(id))
}
pub fn xlen(&mut self, key: &[u8]) -> Result<u64, StoreError> {
Ok(self.stream_ref(key)?.map_or(0, |s| s.length()))
}
pub fn xrange(
&mut self,
key: &[u8],
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Result<EntryBatch, StoreError> {
Ok(self
.stream_ref(key)?
.map_or_else(Vec::new, |s| super::clone_entries(s.range(start, end, count))))
}
pub fn xrevrange(
&mut self,
key: &[u8],
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Result<EntryBatch, StoreError> {
Ok(self
.stream_ref(key)?
.map_or_else(Vec::new, |s| super::clone_entries(s.revrange(start, end, count))))
}
pub fn xread(
&mut self,
key: &[u8],
last_seen: StreamId,
count: Option<usize>,
) -> Result<EntryBatch, StoreError> {
Ok(self
.stream_ref(key)?
.map_or_else(Vec::new, |s| super::clone_entries(s.read_after(last_seen, count))))
}
pub fn xread_dollar_last_id(&mut self, key: &[u8]) -> Result<StreamId, StoreError> {
Ok(self.stream_ref(key)?.map_or(StreamId::MIN, |s| s.last_id()))
}
pub fn xdel(&mut self, key: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
let n;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(0);
};
n = s.del_ids(ids);
}
if n > 0 {
self.bump_if_watched(key);
self.reweigh_entry(key);
}
Ok(n as u64)
}
pub fn xtrim_maxlen(&mut self, key: &[u8], maxlen: u64) -> Result<u64, StoreError> {
let n;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(0);
};
n = s.trim_maxlen(maxlen as usize);
}
if n > 0 {
self.bump_if_watched(key);
self.reweigh_entry(key);
}
Ok(n as u64)
}
pub fn xtrim_minid(&mut self, key: &[u8], minid: StreamId) -> Result<u64, StoreError> {
let n;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(0);
};
n = s.trim_minid(minid);
}
if n > 0 {
self.bump_if_watched(key);
self.reweigh_entry(key);
}
Ok(n as u64)
}
pub fn xsetid(
&mut self,
key: &[u8],
last_id: StreamId,
entries_added: Option<u64>,
max_deleted_id: Option<StreamId>,
) -> Result<(), StoreError> {
{
let Some(s) = self.stream_mut(key, false)? else {
return Err(StoreError::NoSuchKey);
};
s.xsetid(last_id, entries_added, max_deleted_id)?;
}
self.bump_if_watched(key);
Ok(())
}
pub fn xgroup_create(
&mut self,
key: &[u8],
group: &[u8],
mode: GroupCreateMode,
mkstream: bool,
) -> Result<bool, StoreError> {
let exists = self.live_entry(key).is_some();
if !exists && !mkstream {
return Err(StoreError::NoSuchKey);
}
let s = self.stream_mut(key, true)?.expect("created");
let created = s.group_create(group, mode)?;
self.bump_if_watched(key);
self.reweigh_entry(key);
Ok(created)
}
pub fn xgroup_destroy(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
let dropped;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(false);
};
dropped = s.group_destroy(group);
}
if dropped {
self.bump_if_watched(key);
self.reweigh_entry(key);
}
Ok(dropped)
}
pub fn xgroup_setid(
&mut self,
key: &[u8],
group: &[u8],
mode: GroupCreateMode,
) -> Result<bool, StoreError> {
let touched;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(false);
};
touched = s.group_setid(group, mode);
}
if touched {
self.bump_if_watched(key);
}
Ok(touched)
}
pub fn xgroup_create_consumer(
&mut self,
key: &[u8],
group: &[u8],
consumer: &[u8],
now_ms: u64,
) -> Result<bool, StoreError> {
let Some(s) = self.stream_mut(key, false)? else {
return Ok(false);
};
Ok(s.group_create_consumer(group, consumer, now_ms))
}
pub fn xgroup_del_consumer(
&mut self,
key: &[u8],
group: &[u8],
consumer: &[u8],
) -> Result<u64, StoreError> {
let Some(s) = self.stream_mut(key, false)? else {
return Ok(0);
};
Ok(s.group_del_consumer(group, consumer))
}
#[allow(clippy::too_many_arguments)]
pub fn xreadgroup(
&mut self,
key: &[u8],
group: &[u8],
consumer: &[u8],
last_seen: ReadGroupId,
count: Option<usize>,
noack: bool,
now_ms: u64,
) -> Result<EntryBatch, StoreError> {
let result;
{
let Some(s) = self.stream_mut(key, false)? else {
return Err(StoreError::NoSuchKey);
};
result = s.readgroup(group, consumer, last_seen, count, noack, now_ms)?;
}
if !result.is_empty() {
self.bump_if_watched(key);
}
Ok(result)
}
pub fn xreadgroup_has_new(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
Ok(self
.stream_ref(key)?
.and_then(|s| s.group(group).map(|g| s.last_id() > g.last_delivered_id()))
.unwrap_or(false))
}
pub fn xack(&mut self, key: &[u8], group: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
let n;
{
let Some(s) = self.stream_mut(key, false)? else {
return Ok(0);
};
n = s.ack(group, ids);
}
if n > 0 {
self.bump_if_watched(key);
}
Ok(n)
}
pub fn xpending_summary(
&mut self,
key: &[u8],
group: &[u8],
) -> Result<Option<PendingSummary>, StoreError> {
Ok(self.stream_ref(key)?.and_then(|s| s.pending_summary(group)))
}
#[allow(clippy::too_many_arguments)]
pub fn xpending_extended(
&mut self,
key: &[u8],
group: &[u8],
idle_min_ms: Option<u64>,
start: StreamId,
end: StreamId,
count: usize,
consumer_filter: Option<&[u8]>,
now_ms: u64,
) -> Result<Option<PendingExtended>, StoreError> {
Ok(self.stream_ref(key)?.and_then(|s| {
s.pending_extended(group, idle_min_ms, start, end, count, consumer_filter, now_ms)
}))
}
pub fn xclaim(
&mut self,
key: &[u8],
group: &[u8],
new_owner: &[u8],
ids: &[StreamId],
opts: &XClaimOpts,
now_ms: u64,
) -> Result<EntryBatch, StoreError> {
let claimed;
let payloads;
{
let Some(s) = self.stream_mut(key, false)? else {
return Err(StoreError::NoSuchKey);
};
claimed = s.claim(group, new_owner, ids, opts, now_ms)?;
payloads = s.payloads_for(&claimed);
}
if !claimed.is_empty() {
self.bump_if_watched(key);
}
Ok(payloads)
}
#[allow(clippy::too_many_arguments)]
pub fn xautoclaim(
&mut self,
key: &[u8],
group: &[u8],
new_owner: &[u8],
min_idle_ms: u64,
start: StreamId,
count: usize,
justid: bool,
now_ms: u64,
) -> Result<(StreamId, EntryBatch, Vec<StreamId>), StoreError> {
let payloads;
let next_cursor;
let deleted_ids;
{
let Some(s) = self.stream_mut(key, false)? else {
return Err(StoreError::NoSuchKey);
};
let AutoclaimResult { next_cursor: nc, claimed_ids, deleted_ids: di } =
s.autoclaim(group, new_owner, min_idle_ms, start, count, justid, now_ms)?;
payloads = s.payloads_for(&claimed_ids);
next_cursor = nc;
deleted_ids = di;
}
if !payloads.is_empty() {
self.bump_if_watched(key);
}
Ok((next_cursor, payloads, deleted_ids))
}
}