1use super::group::{AutoclaimResult, ReadGroupId};
8use super::{
9 GroupCreateMode, PendingExtended, PendingSummary, StreamData, StreamId, XAddIdSpec, XClaimOpts,
10};
11use crate::value::{SmallBytes, Value};
12use crate::{Entry, Store, StoreError};
13use std::sync::Arc;
14
15pub type EntryBatch = Vec<(StreamId, Vec<(Vec<u8>, Vec<u8>)>)>;
19
20impl Store {
21 fn stream_mut(
22 &mut self,
23 key: &[u8],
24 create: bool,
25 ) -> Result<Option<&mut StreamData>, StoreError> {
26 if self.live_entry_mut(key).is_none() {
27 if !create {
28 return Ok(None);
29 }
30 self.insert_entry(
31 SmallBytes::from_slice(key),
32 Entry::new(Value::Stream(Arc::default()), None),
33 );
34 }
35 match &mut self.map.get_mut(key).expect("present").value {
36 Value::Stream(s) => Ok(Some(Arc::make_mut(s))),
37 _ => Err(StoreError::WrongType),
38 }
39 }
40
41 fn stream_ref(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
42 match self.live_entry(key) {
43 None => Ok(None),
44 Some(e) => match &e.value {
45 Value::Stream(s) => Ok(Some(s.as_ref())),
46 _ => Err(StoreError::WrongType),
47 },
48 }
49 }
50
51 pub fn stream_view(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
56 self.stream_ref(key)
57 }
58
59 pub fn xadd(
64 &mut self,
65 key: &[u8],
66 spec: XAddIdSpec,
67 fields: Vec<(Vec<u8>, Vec<u8>)>,
68 nomkstream: bool,
69 now_ms: u64,
70 ) -> Result<Option<StreamId>, StoreError> {
71 if nomkstream && self.live_entry(key).is_none() {
72 return Ok(None);
73 }
74 let id;
75 let weight_delta;
76 {
77 let s = self.stream_mut(key, true)?.expect("created");
78 id = s.resolve_xadd_id(spec, now_ms)?;
79 let smb_fields: Vec<(SmallBytes, SmallBytes)> = fields
80 .into_iter()
81 .map(|(f, v)| (SmallBytes::from_slice(&f), SmallBytes::from_slice(&v)))
82 .collect();
83 weight_delta = super::stream_entry_weight(&smb_fields);
84 s.insert(id, smb_fields);
85 }
86 self.bump_if_watched(key);
87 self.account_delta(key, weight_delta as i64);
88 Ok(Some(id))
89 }
90
91 pub fn xlen(&mut self, key: &[u8]) -> Result<u64, StoreError> {
93 Ok(self.stream_ref(key)?.map_or(0, super::StreamData::length))
94 }
95
96 pub fn xrange(
98 &mut self,
99 key: &[u8],
100 start: StreamId,
101 end: StreamId,
102 count: Option<usize>,
103 ) -> Result<EntryBatch, StoreError> {
104 Ok(self
105 .stream_ref(key)?
106 .map_or_else(Vec::new, |s| super::clone_entries(s.range(start, end, count))))
107 }
108
109 pub fn xrevrange(
111 &mut self,
112 key: &[u8],
113 start: StreamId,
114 end: StreamId,
115 count: Option<usize>,
116 ) -> Result<EntryBatch, StoreError> {
117 Ok(self
118 .stream_ref(key)?
119 .map_or_else(Vec::new, |s| super::clone_entries(s.revrange(start, end, count))))
120 }
121
122 pub fn xread(
124 &mut self,
125 key: &[u8],
126 last_seen: StreamId,
127 count: Option<usize>,
128 ) -> Result<EntryBatch, StoreError> {
129 Ok(self
130 .stream_ref(key)?
131 .map_or_else(Vec::new, |s| super::clone_entries(s.read_after(last_seen, count))))
132 }
133
134 pub fn xread_dollar_last_id(&mut self, key: &[u8]) -> Result<StreamId, StoreError> {
137 Ok(self.stream_ref(key)?.map_or(StreamId::MIN, super::StreamData::last_id))
138 }
139
140 pub fn xdel(&mut self, key: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
142 let n;
143 {
144 let Some(s) = self.stream_mut(key, false)? else {
145 return Ok(0);
146 };
147 n = s.del_ids(ids);
148 }
149 if n > 0 {
150 self.bump_if_watched(key);
151 self.reweigh_entry(key);
152 }
153 Ok(n as u64)
154 }
155
156 pub fn xtrim_maxlen(&mut self, key: &[u8], maxlen: u64) -> Result<u64, StoreError> {
158 let n;
159 {
160 let Some(s) = self.stream_mut(key, false)? else {
161 return Ok(0);
162 };
163 n = s.trim_maxlen(maxlen as usize);
164 }
165 if n > 0 {
166 self.bump_if_watched(key);
167 self.reweigh_entry(key);
168 }
169 Ok(n as u64)
170 }
171
172 pub fn xtrim_minid(&mut self, key: &[u8], minid: StreamId) -> Result<u64, StoreError> {
174 let n;
175 {
176 let Some(s) = self.stream_mut(key, false)? else {
177 return Ok(0);
178 };
179 n = s.trim_minid(minid);
180 }
181 if n > 0 {
182 self.bump_if_watched(key);
183 self.reweigh_entry(key);
184 }
185 Ok(n as u64)
186 }
187
188 pub fn xsetid(
193 &mut self,
194 key: &[u8],
195 last_id: StreamId,
196 entries_added: Option<u64>,
197 max_deleted_id: Option<StreamId>,
198 ) -> Result<(), StoreError> {
199 {
200 let Some(s) = self.stream_mut(key, false)? else {
201 return Err(StoreError::NoSuchKey);
202 };
203 s.xsetid(last_id, entries_added, max_deleted_id)?;
204 }
205 self.bump_if_watched(key);
206 Ok(())
207 }
208
209 pub fn xgroup_create(
216 &mut self,
217 key: &[u8],
218 group: &[u8],
219 mode: GroupCreateMode,
220 mkstream: bool,
221 ) -> Result<bool, StoreError> {
222 let exists = self.live_entry(key).is_some();
223 if !exists && !mkstream {
224 return Err(StoreError::NoSuchKey);
225 }
226 let s = self.stream_mut(key, true)?.expect("created");
227 let created = s.group_create(group, mode)?;
228 self.bump_if_watched(key);
229 self.reweigh_entry(key);
230 Ok(created)
231 }
232
233 pub fn xgroup_destroy(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
235 let dropped;
236 {
237 let Some(s) = self.stream_mut(key, false)? else {
238 return Ok(false);
239 };
240 dropped = s.group_destroy(group);
241 }
242 if dropped {
243 self.bump_if_watched(key);
244 self.reweigh_entry(key);
245 }
246 Ok(dropped)
247 }
248
249 pub fn xgroup_setid(
251 &mut self,
252 key: &[u8],
253 group: &[u8],
254 mode: GroupCreateMode,
255 ) -> Result<bool, StoreError> {
256 let touched;
257 {
258 let Some(s) = self.stream_mut(key, false)? else {
259 return Ok(false);
260 };
261 touched = s.group_setid(group, mode);
262 }
263 if touched {
264 self.bump_if_watched(key);
265 }
266 Ok(touched)
267 }
268
269 pub fn xgroup_create_consumer(
271 &mut self,
272 key: &[u8],
273 group: &[u8],
274 consumer: &[u8],
275 now_ms: u64,
276 ) -> Result<bool, StoreError> {
277 let Some(s) = self.stream_mut(key, false)? else {
278 return Ok(false);
279 };
280 Ok(s.group_create_consumer(group, consumer, now_ms))
281 }
282
283 pub fn xgroup_del_consumer(
285 &mut self,
286 key: &[u8],
287 group: &[u8],
288 consumer: &[u8],
289 ) -> Result<u64, StoreError> {
290 let Some(s) = self.stream_mut(key, false)? else {
291 return Ok(0);
292 };
293 Ok(s.group_del_consumer(group, consumer))
294 }
295
296 #[allow(clippy::too_many_arguments)]
298 pub fn xreadgroup(
299 &mut self,
300 key: &[u8],
301 group: &[u8],
302 consumer: &[u8],
303 last_seen: ReadGroupId,
304 count: Option<usize>,
305 noack: bool,
306 now_ms: u64,
307 ) -> Result<EntryBatch, StoreError> {
308 let result;
309 {
310 let Some(s) = self.stream_mut(key, false)? else {
311 return Err(StoreError::NoSuchKey);
312 };
313 result = s.readgroup(group, consumer, last_seen, count, noack, now_ms)?;
314 }
315 if !result.is_empty() {
316 self.bump_if_watched(key);
317 }
318 Ok(result)
319 }
320
321 pub fn xreadgroup_has_new(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
327 Ok(self
328 .stream_ref(key)?
329 .and_then(|s| s.group(group).map(|g| s.last_id() > g.last_delivered_id()))
330 .unwrap_or(false))
331 }
332
333 pub fn xack(&mut self, key: &[u8], group: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
335 let n;
336 {
337 let Some(s) = self.stream_mut(key, false)? else {
338 return Ok(0);
339 };
340 n = s.ack(group, ids);
341 }
342 if n > 0 {
343 self.bump_if_watched(key);
344 }
345 Ok(n)
346 }
347
348 pub fn xpending_summary(
350 &mut self,
351 key: &[u8],
352 group: &[u8],
353 ) -> Result<Option<PendingSummary>, StoreError> {
354 Ok(self.stream_ref(key)?.and_then(|s| s.pending_summary(group)))
355 }
356
357 #[allow(clippy::too_many_arguments)]
360 pub fn xpending_extended(
361 &mut self,
362 key: &[u8],
363 group: &[u8],
364 idle_min_ms: Option<u64>,
365 start: StreamId,
366 end: StreamId,
367 count: usize,
368 consumer_filter: Option<&[u8]>,
369 now_ms: u64,
370 ) -> Result<Option<PendingExtended>, StoreError> {
371 Ok(self.stream_ref(key)?.and_then(|s| {
372 s.pending_extended(group, idle_min_ms, start, end, count, consumer_filter, now_ms)
373 }))
374 }
375
376 pub fn xclaim(
380 &mut self,
381 key: &[u8],
382 group: &[u8],
383 new_owner: &[u8],
384 ids: &[StreamId],
385 opts: &XClaimOpts,
386 now_ms: u64,
387 ) -> Result<EntryBatch, StoreError> {
388 let claimed;
389 let payloads;
390 {
391 let Some(s) = self.stream_mut(key, false)? else {
392 return Err(StoreError::NoSuchKey);
393 };
394 claimed = s.claim(group, new_owner, ids, opts, now_ms)?;
395 payloads = s.payloads_for(&claimed);
396 }
397 if !claimed.is_empty() {
398 self.bump_if_watched(key);
399 }
400 Ok(payloads)
401 }
402
403 #[allow(clippy::too_many_arguments)]
406 pub fn xautoclaim(
407 &mut self,
408 key: &[u8],
409 group: &[u8],
410 new_owner: &[u8],
411 min_idle_ms: u64,
412 start: StreamId,
413 count: usize,
414 justid: bool,
415 now_ms: u64,
416 ) -> Result<(StreamId, EntryBatch, Vec<StreamId>), StoreError> {
417 let payloads;
418 let next_cursor;
419 let deleted_ids;
420 {
421 let Some(s) = self.stream_mut(key, false)? else {
422 return Err(StoreError::NoSuchKey);
423 };
424 let AutoclaimResult { next_cursor: nc, claimed_ids, deleted_ids: di } =
425 s.autoclaim(group, new_owner, min_idle_ms, start, count, justid, now_ms)?;
426 payloads = s.payloads_for(&claimed_ids);
427 next_cursor = nc;
428 deleted_ids = di;
429 }
430 if !payloads.is_empty() {
431 self.bump_if_watched(key);
432 }
433 Ok((next_cursor, payloads, deleted_ids))
434 }
435}