1use std::collections::BTreeMap;
12#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use kevy_map::KevyMap;
16
17use crate::value::{SmallBytes, BTREE_SLOT_BYTES};
18use crate::StoreError;
19
20#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
26pub struct StreamId {
27 pub ms: u64,
29 pub seq: u64,
31}
32
33impl StreamId {
34 pub const MIN: StreamId = StreamId { ms: 0, seq: 0 };
36 pub const MAX: StreamId = StreamId { ms: u64::MAX, seq: u64::MAX };
38
39 pub fn encode(self) -> Vec<u8> {
41 format!("{}-{}", self.ms, self.seq).into_bytes()
42 }
43
44 #[must_use]
46 pub fn next(self) -> Self {
47 if self.seq < u64::MAX {
48 StreamId { ms: self.ms, seq: self.seq + 1 }
49 } else if self.ms < u64::MAX {
50 StreamId { ms: self.ms + 1, seq: 0 }
51 } else {
52 StreamId::MAX
53 }
54 }
55}
56
57#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub enum XAddIdSpec {
61 AutoAll,
63 AutoSeq(u64),
65 Explicit(StreamId),
67}
68
69pub fn parse_xadd_id(s: &[u8]) -> Result<XAddIdSpec, StreamIdError> {
71 if s == b"*" {
72 return Ok(XAddIdSpec::AutoAll);
73 }
74 let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
75 match txt.split_once('-') {
76 None => {
77 let ms = txt.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
78 Ok(XAddIdSpec::Explicit(StreamId { ms, seq: 0 }))
79 }
80 Some((ms_s, seq_s)) => {
81 let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
82 if seq_s == "*" {
83 Ok(XAddIdSpec::AutoSeq(ms))
84 } else {
85 let seq = seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
86 Ok(XAddIdSpec::Explicit(StreamId { ms, seq }))
87 }
88 }
89 }
90}
91
92pub fn parse_range_start(s: &[u8]) -> Result<StreamId, StreamIdError> {
95 if s == b"-" {
96 return Ok(StreamId::MIN);
97 }
98 parse_explicit_id(s, false)
99}
100
101pub fn parse_range_end(s: &[u8]) -> Result<StreamId, StreamIdError> {
104 if s == b"+" {
105 return Ok(StreamId::MAX);
106 }
107 parse_explicit_id(s, true)
108}
109
110pub fn parse_explicit_id(s: &[u8], end: bool) -> Result<StreamId, StreamIdError> {
114 let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
115 let (ms_s, seq_s) = match txt.split_once('-') {
116 Some(p) => p,
117 None => (txt, if end { "" } else { "0" }),
118 };
119 let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
120 let seq = if seq_s.is_empty() {
121 u64::MAX
122 } else {
123 seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?
124 };
125 Ok(StreamId { ms, seq })
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum StreamIdError {
133 Invalid,
135}
136
137#[derive(Default, Clone)]
144pub struct StreamData {
145 pub(super) entries: BTreeMap<StreamId, Vec<(SmallBytes, SmallBytes)>>,
147 pub(super) last_id: StreamId,
150 pub(super) max_deleted_id: StreamId,
153 pub(super) entries_added: u64,
156 pub(super) groups: KevyMap<SmallBytes, Box<group::ConsumerGroup>>,
159}
160
161impl StreamData {
162 pub fn length(&self) -> u64 {
164 self.entries.len() as u64
165 }
166
167 pub fn last_id(&self) -> StreamId {
170 self.last_id
171 }
172
173 pub fn entries_added(&self) -> u64 {
175 self.entries_added
176 }
177
178 pub fn max_deleted_id(&self) -> StreamId {
179 self.max_deleted_id
180 }
181
182 pub fn iter_entries(
185 &self,
186 ) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])> {
187 self.entries.iter().map(|(id, fv)| (*id, fv.as_slice()))
188 }
189
190 pub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
192 self.entries.iter().next().map(|(id, fv)| (*id, fv.as_slice()))
193 }
194
195 pub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
197 self.entries.iter().next_back().map(|(id, fv)| (*id, fv.as_slice()))
198 }
199
200 pub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &group::ConsumerGroup)> {
202 self.groups.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
203 }
204
205 pub fn group(&self, name: &[u8]) -> Option<&group::ConsumerGroup> {
207 self.groups.get(name).map(std::convert::AsRef::as_ref)
208 }
209
210 pub fn group_count(&self) -> usize {
212 self.groups.len()
213 }
214
215 pub fn load_entry(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
219 self.entries.insert(id, fields);
220 }
221
222 pub fn set_loaded_state(
225 &mut self,
226 last_id: StreamId,
227 max_deleted_id: StreamId,
228 entries_added: u64,
229 ) {
230 self.last_id = last_id;
231 self.max_deleted_id = max_deleted_id;
232 self.entries_added = entries_added;
233 }
234
235 pub(crate) fn insert(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
238 debug_assert!(id > self.last_id || (id == StreamId::MIN && self.last_id == StreamId::MIN));
239 self.entries.insert(id, fields);
240 self.last_id = id;
241 self.entries_added += 1;
242 }
243
244 pub fn resolve_xadd_id(
248 &self,
249 spec: XAddIdSpec,
250 now_ms: u64,
251 ) -> Result<StreamId, StoreError> {
252 let candidate = match spec {
253 XAddIdSpec::AutoAll => {
254 let ms = now_ms.max(self.last_id.ms);
255 if ms == self.last_id.ms {
256 StreamId { ms, seq: self.last_id.seq + 1 }
257 } else {
258 StreamId { ms, seq: 0 }
259 }
260 }
261 XAddIdSpec::AutoSeq(ms) => {
262 if ms < self.last_id.ms {
263 return Err(StoreError::OutOfRange);
264 }
265 if ms == self.last_id.ms {
266 StreamId { ms, seq: self.last_id.seq + 1 }
267 } else {
268 StreamId { ms, seq: 0 }
269 }
270 }
271 XAddIdSpec::Explicit(id) => {
272 if id <= self.last_id {
273 return Err(StoreError::OutOfRange);
274 }
275 if id == StreamId::MIN {
276 return Err(StoreError::OutOfRange);
277 }
278 id
279 }
280 };
281 Ok(candidate)
282 }
283
284 pub fn range(
286 &self,
287 start: StreamId,
288 end: StreamId,
289 count: Option<usize>,
290 ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
291 let iter = self.entries.range(start..=end).map(|(id, fv)| (*id, fv.as_slice()));
292 match count {
293 Some(n) => iter.take(n).collect(),
294 None => iter.collect(),
295 }
296 }
297
298 pub fn revrange(
300 &self,
301 start: StreamId,
302 end: StreamId,
303 count: Option<usize>,
304 ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
305 let iter = self.entries.range(start..=end).rev().map(|(id, fv)| (*id, fv.as_slice()));
306 match count {
307 Some(n) => iter.take(n).collect(),
308 None => iter.collect(),
309 }
310 }
311
312 pub fn read_after(
314 &self,
315 last_seen: StreamId,
316 count: Option<usize>,
317 ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
318 if last_seen == StreamId::MAX {
319 return Vec::new();
320 }
321 self.range(last_seen.next(), StreamId::MAX, count)
322 }
323
324 pub(crate) fn del_ids(&mut self, ids: &[StreamId]) -> usize {
328 let mut removed = 0usize;
329 for id in ids {
330 if self.entries.remove(id).is_some() {
331 removed += 1;
332 if *id > self.max_deleted_id {
333 self.max_deleted_id = *id;
334 }
335 }
336 }
337 removed
338 }
339
340 pub(crate) fn trim_maxlen(&mut self, n: usize) -> usize {
342 let len = self.entries.len();
343 if len <= n {
344 return 0;
345 }
346 let drop = len - n;
347 let mut removed = 0;
348 let drop_ids: Vec<StreamId> = self.entries.keys().copied().take(drop).collect();
349 for id in drop_ids {
350 self.entries.remove(&id);
351 if id > self.max_deleted_id {
352 self.max_deleted_id = id;
353 }
354 removed += 1;
355 }
356 removed
357 }
358
359 pub fn weight(&self) -> u64 {
362 let entry_sum: u64 = self
363 .entries
364 .values()
365 .map(|fv| {
366 24 + fv
367 .iter()
368 .map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
369 .sum::<u64>()
370 })
371 .sum();
372 (self.entries.len() as u64).saturating_mul(BTREE_SLOT_BYTES) + entry_sum
373 }
374
375 pub(crate) fn trim_minid(&mut self, floor: StreamId) -> usize {
377 let drop_ids: Vec<StreamId> = self
378 .entries
379 .range(..floor)
380 .map(|(id, _)| *id)
381 .collect();
382 let removed = drop_ids.len();
383 for id in drop_ids {
384 self.entries.remove(&id);
385 if id > self.max_deleted_id {
386 self.max_deleted_id = id;
387 }
388 }
389 removed
390 }
391}
392
393mod claim;
394mod group;
395mod load;
396mod store;
397#[allow(unused_imports)]
398pub use claim::AutoclaimResult;
399pub use load::{LoadedGroup, LoadedPelEntry};
400#[allow(unused_imports)]
401pub use group::{
402 ConsumerGroup, ConsumerState, GroupCreateMode, PelEntry, PendingExtended,
403 PendingExtendedRow, PendingSummary, ReadGroupId, XClaimOpts,
404};
405pub use store::EntryBatch;
406
407pub type LoadedStreamEntry = (u64, u64, Vec<(Vec<u8>, Vec<u8>)>);
411
412#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
420pub fn now_unix_ms() -> u64 {
421 SystemTime::now()
422 .duration_since(UNIX_EPOCH)
423 .map_or(0, |d| d.as_millis() as u64)
424}
425
426#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
427pub fn now_unix_ms() -> u64 {
428 crate::clock::wall_now_unix_ms()
429}
430
431pub(super) fn stream_entry_weight(fields: &[(SmallBytes, SmallBytes)]) -> u64 {
432 BTREE_SLOT_BYTES
434 + 24
435 + fields
436 .iter()
437 .map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
438 .sum::<u64>()
439}
440
441pub(super) fn clone_entries(
442 src: Vec<(StreamId, &[(SmallBytes, SmallBytes)])>,
443) -> EntryBatch {
444 src.into_iter()
445 .map(|(id, fv)| (id, fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect()))
446 .collect()
447}