use std::collections::BTreeMap;
use std::time::{SystemTime, UNIX_EPOCH};
use kevy_map::KevyMap;
use crate::value::*;
use crate::StoreError;
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
pub struct StreamId {
pub ms: u64,
pub seq: u64,
}
impl StreamId {
pub const MIN: StreamId = StreamId { ms: 0, seq: 0 };
pub const MAX: StreamId = StreamId { ms: u64::MAX, seq: u64::MAX };
pub fn encode(self) -> Vec<u8> {
format!("{}-{}", self.ms, self.seq).into_bytes()
}
pub fn next(self) -> Self {
if self.seq < u64::MAX {
StreamId { ms: self.ms, seq: self.seq + 1 }
} else if self.ms < u64::MAX {
StreamId { ms: self.ms + 1, seq: 0 }
} else {
StreamId::MAX
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum XAddIdSpec {
AutoAll,
AutoSeq(u64),
Explicit(StreamId),
}
pub fn parse_xadd_id(s: &[u8]) -> Result<XAddIdSpec, StreamIdError> {
if s == b"*" {
return Ok(XAddIdSpec::AutoAll);
}
let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
match txt.split_once('-') {
None => {
let ms = txt.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
Ok(XAddIdSpec::Explicit(StreamId { ms, seq: 0 }))
}
Some((ms_s, seq_s)) => {
let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
if seq_s == "*" {
Ok(XAddIdSpec::AutoSeq(ms))
} else {
let seq = seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
Ok(XAddIdSpec::Explicit(StreamId { ms, seq }))
}
}
}
}
pub fn parse_range_start(s: &[u8]) -> Result<StreamId, StreamIdError> {
if s == b"-" {
return Ok(StreamId::MIN);
}
parse_explicit_id(s, false)
}
pub fn parse_range_end(s: &[u8]) -> Result<StreamId, StreamIdError> {
if s == b"+" {
return Ok(StreamId::MAX);
}
parse_explicit_id(s, true)
}
pub fn parse_explicit_id(s: &[u8], end: bool) -> Result<StreamId, StreamIdError> {
let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
let (ms_s, seq_s) = match txt.split_once('-') {
Some(p) => p,
None => (txt, if end { "" } else { "0" }),
};
let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
let seq = if seq_s.is_empty() {
u64::MAX
} else {
seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?
};
Ok(StreamId { ms, seq })
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamIdError {
Invalid,
}
#[derive(Default)]
pub struct StreamData {
pub(super) entries: BTreeMap<StreamId, Vec<(SmallBytes, SmallBytes)>>,
pub(super) last_id: StreamId,
pub(super) max_deleted_id: StreamId,
pub(super) entries_added: u64,
pub(super) groups: KevyMap<SmallBytes, Box<group::ConsumerGroup>>,
}
impl StreamData {
pub fn length(&self) -> u64 {
self.entries.len() as u64
}
pub fn last_id(&self) -> StreamId {
self.last_id
}
pub fn entries_added(&self) -> u64 {
self.entries_added
}
pub fn max_deleted_id(&self) -> StreamId {
self.max_deleted_id
}
pub fn iter_entries(
&self,
) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])> {
self.entries.iter().map(|(id, fv)| (*id, fv.as_slice()))
}
pub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
self.entries.iter().next().map(|(id, fv)| (*id, fv.as_slice()))
}
pub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
self.entries.iter().next_back().map(|(id, fv)| (*id, fv.as_slice()))
}
pub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &group::ConsumerGroup)> {
self.groups.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
}
pub fn group(&self, name: &[u8]) -> Option<&group::ConsumerGroup> {
self.groups.get(name).map(|b| b.as_ref())
}
pub fn group_count(&self) -> usize {
self.groups.len()
}
pub fn load_entry(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
self.entries.insert(id, fields);
}
pub fn set_loaded_state(
&mut self,
last_id: StreamId,
max_deleted_id: StreamId,
entries_added: u64,
) {
self.last_id = last_id;
self.max_deleted_id = max_deleted_id;
self.entries_added = entries_added;
}
pub(crate) fn insert(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
debug_assert!(id > self.last_id || (id == StreamId::MIN && self.last_id == StreamId::MIN));
self.entries.insert(id, fields);
self.last_id = id;
self.entries_added += 1;
}
pub fn resolve_xadd_id(
&self,
spec: XAddIdSpec,
now_ms: u64,
) -> Result<StreamId, StoreError> {
let candidate = match spec {
XAddIdSpec::AutoAll => {
let ms = now_ms.max(self.last_id.ms);
if ms == self.last_id.ms {
StreamId { ms, seq: self.last_id.seq + 1 }
} else {
StreamId { ms, seq: 0 }
}
}
XAddIdSpec::AutoSeq(ms) => {
if ms < self.last_id.ms {
return Err(StoreError::OutOfRange);
}
if ms == self.last_id.ms {
StreamId { ms, seq: self.last_id.seq + 1 }
} else {
StreamId { ms, seq: 0 }
}
}
XAddIdSpec::Explicit(id) => {
if id <= self.last_id {
return Err(StoreError::OutOfRange);
}
if id == StreamId::MIN {
return Err(StoreError::OutOfRange);
}
id
}
};
Ok(candidate)
}
pub fn range(
&self,
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
let iter = self.entries.range(start..=end).map(|(id, fv)| (*id, fv.as_slice()));
match count {
Some(n) => iter.take(n).collect(),
None => iter.collect(),
}
}
pub fn revrange(
&self,
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
let iter = self.entries.range(start..=end).rev().map(|(id, fv)| (*id, fv.as_slice()));
match count {
Some(n) => iter.take(n).collect(),
None => iter.collect(),
}
}
pub fn read_after(
&self,
last_seen: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
if last_seen == StreamId::MAX {
return Vec::new();
}
self.range(last_seen.next(), StreamId::MAX, count)
}
pub(crate) fn del_ids(&mut self, ids: &[StreamId]) -> usize {
let mut removed = 0usize;
for id in ids {
if self.entries.remove(id).is_some() {
removed += 1;
if *id > self.max_deleted_id {
self.max_deleted_id = *id;
}
}
}
removed
}
pub(crate) fn trim_maxlen(&mut self, n: usize) -> usize {
let len = self.entries.len();
if len <= n {
return 0;
}
let drop = len - n;
let mut removed = 0;
let drop_ids: Vec<StreamId> = self.entries.keys().copied().take(drop).collect();
for id in drop_ids {
self.entries.remove(&id);
if id > self.max_deleted_id {
self.max_deleted_id = id;
}
removed += 1;
}
removed
}
pub fn weight(&self) -> u64 {
let entry_sum: u64 = self
.entries
.values()
.map(|fv| {
24 + fv
.iter()
.map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
.sum::<u64>()
})
.sum();
(self.entries.len() as u64).saturating_mul(BTREE_SLOT_BYTES) + entry_sum
}
pub(crate) fn trim_minid(&mut self, floor: StreamId) -> usize {
let drop_ids: Vec<StreamId> = self
.entries
.range(..floor)
.map(|(id, _)| *id)
.collect();
let removed = drop_ids.len();
for id in drop_ids {
self.entries.remove(&id);
if id > self.max_deleted_id {
self.max_deleted_id = id;
}
}
removed
}
}
mod claim;
mod group;
mod store;
#[allow(unused_imports)]
pub use claim::AutoclaimResult;
#[allow(unused_imports)]
pub use group::{
ConsumerGroup, ConsumerState, GroupCreateMode, PelEntry, PendingExtended,
PendingExtendedRow, PendingSummary, ReadGroupId, XClaimOpts,
};
pub use store::EntryBatch;
pub type LoadedStreamEntry = (u64, u64, Vec<(Vec<u8>, Vec<u8>)>);
pub fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub(super) fn stream_entry_weight(fields: &[(SmallBytes, SmallBytes)]) -> u64 {
BTREE_SLOT_BYTES
+ 24
+ fields
.iter()
.map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
.sum::<u64>()
}
pub(super) fn clone_entries(
src: Vec<(StreamId, &[(SmallBytes, SmallBytes)])>,
) -> EntryBatch {
src.into_iter()
.map(|(id, fv)| (id, fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect()))
.collect()
}