1use std::sync::Arc;
9use std::time::Duration;
10
11use crate::value::{HashData, SetData, Value, ZSetData};
12use crate::{
13 Entry, RenameOutcome, SmallBytes, Store, deadline_at, glob_match, now_ns, pack_deadline,
14 remaining_ms,
15};
16
17impl Store {
18 pub fn del(&mut self, keys: &[Vec<u8>]) -> usize {
21 let now = now_ns();
22 let mut removed = 0;
23 for k in keys {
24 if self.reap(k, now) && self.remove_entry(k.as_slice()).is_some() {
25 removed += 1;
26 }
27 }
28 removed
29 }
30
31 pub fn del_borrowed(&mut self, keys: &[&[u8]]) -> usize {
35 let now = now_ns();
36 let mut removed = 0;
37 for k in keys {
38 if self.reap(k, now) && self.remove_entry(k).is_some() {
39 removed += 1;
40 }
41 }
42 removed
43 }
44
45 pub fn exists(&mut self, keys: &[Vec<u8>]) -> usize {
46 keys.iter().filter(|k| self.live_entry(k).is_some()).count()
47 }
48
49 pub fn exists_borrowed(&mut self, keys: &[&[u8]]) -> usize {
51 keys.iter().filter(|k| self.live_entry(k).is_some()).count()
52 }
53
54 pub fn expire(&mut self, key: &[u8], ttl: Duration) -> bool {
55 let now = now_ns();
56 if !self.reap(key, now) {
57 return false;
58 }
59 let Some(e) = self.map.get_mut(key) else {
60 return false;
61 };
62 let had = e.expire_at_ns.is_some();
63 e.expire_at_ns = pack_deadline(deadline_at(now, ttl));
64 let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
65 self.adjust_expires(delta);
66 true
67 }
68
69 pub fn expire_at_unix_ms(&mut self, key: &[u8], deadline_ms: u64) -> bool {
79 let now = now_ns();
80 if !self.reap(key, now) || !self.map.contains_key(key) {
81 return false;
82 }
83 let wall_now = crate::now_unix_ms();
84 if deadline_ms <= wall_now {
85 self.remove_entry(key);
87 return true;
88 }
89 let remaining = Duration::from_millis(deadline_ms - wall_now);
90 if let Some(e) = self.map.get_mut(key) {
91 let had = e.expire_at_ns.is_some();
92 e.expire_at_ns = pack_deadline(deadline_at(now, remaining));
93 let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
94 self.adjust_expires(delta);
95 }
96 true
97 }
98
99 pub fn take_with_ttl(&mut self, key: &[u8]) -> Option<(Value, Option<u64>)> {
106 let now = now_ns();
107 if !self.reap(key, now) {
108 return None;
109 }
110 let entry = self.remove_entry(key)?;
111 let ttl_ms = entry.expire_at_ns.map(|ns| remaining_ms(ns, now));
112 Some((entry.value, ttl_ms))
113 }
114
115 pub fn put_with_ttl(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
121 let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
122 let entry = Entry::new(value, expire_at);
123 self.remove_entry(&key);
126 self.insert_entry(SmallBytes::from_vec(key), entry);
127 }
128
129 pub fn key_exists(&mut self, key: &[u8]) -> bool {
133 let now = now_ns();
134 self.reap(key, now) && self.map.contains_key(key)
135 }
136
137 pub fn rename(&mut self, src: &[u8], dst: &[u8], nx: bool) -> RenameOutcome {
145 let now = now_ns();
146 if !self.reap(src, now) {
147 return RenameOutcome::NoSuchSrc;
148 }
149 if src == dst {
150 return if nx {
154 RenameOutcome::DstExists
155 } else {
156 RenameOutcome::Renamed
157 };
158 }
159 if nx {
160 let dst_live = self.reap(dst, now) && self.map.contains_key(dst);
163 if dst_live {
164 return RenameOutcome::DstExists;
165 }
166 }
167 let Some(entry) = self.remove_entry(src) else {
170 return RenameOutcome::NoSuchSrc;
171 };
172 self.remove_entry(dst);
176 self.insert_entry(SmallBytes::from_vec(dst.to_vec()), entry);
177 RenameOutcome::Renamed
178 }
179
180 pub fn persist(&mut self, key: &[u8]) -> bool {
181 let now = now_ns();
182 if !self.reap(key, now) {
183 return false;
184 }
185 let cleared = match self.map.get_mut(key) {
186 Some(e) if e.expire_at_ns.is_some() => {
187 e.expire_at_ns = None;
188 true
189 }
190 _ => false,
191 };
192 if cleared {
193 self.adjust_expires(-1);
194 }
195 cleared
196 }
197
198 pub fn pttl(&mut self, key: &[u8]) -> i64 {
200 let now = now_ns();
201 if !self.reap(key, now) {
202 return -2;
203 }
204 match self.map.get(key).and_then(|e| e.expire_at_ns) {
205 None => -1,
206 Some(ns) => remaining_ms(ns, now) as i64,
207 }
208 }
209
210 pub fn type_of(&mut self, key: &[u8]) -> &'static str {
211 let now = now_ns();
212 if !self.reap(key, now) {
213 return "none";
214 }
215 self.map.get(key).map_or("none", |e| e.value.type_name())
216 }
217
218 pub fn dbsize(&self) -> usize {
219 self.map.len()
220 }
221
222 pub fn flushall(&mut self) {
230 self.map.clear();
231 self.used_memory = 0;
232 self.expires = 0;
233 }
235
236 #[deprecated(
239 since = "1.17.0",
240 note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the keyspace"
241 )]
242 pub fn flush(&mut self) {
243 self.flushall();
244 }
245
246 pub fn ttl_pending_count(&self) -> usize {
251 let now = now_ns();
252 self.map
253 .values()
254 .filter(|e| e.expire_at_ns.is_some() && !e.is_expired_at(now))
255 .count()
256 }
257
258 pub fn snapshot_each<F: FnMut(&[u8], &Value, Option<u64>)>(&self, mut f: F) {
262 let now = now_ns();
263 for (k, e) in &self.map {
264 if e.is_expired_at(now) {
265 continue;
266 }
267 let ttl = e.expire_at_ns.map(|ns| remaining_ms(ns, now));
268 f(k.as_slice(), &e.value, ttl);
269 }
270 }
271
272 fn insert_loaded(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
273 let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
274 self.insert_entry(SmallBytes::from_vec(key), Entry::new(value, expire_at));
275 }
276
277 pub fn load_str(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: Option<u64>) {
278 self.insert_loaded(key, Value::Str(SmallBytes::from_vec(value)), ttl_ms);
279 }
280
281 pub fn load_hash(
282 &mut self,
283 key: Vec<u8>,
284 fields: Vec<(Vec<u8>, Vec<u8>)>,
285 ttl_ms: Option<u64>,
286 ) {
287 let hash_data: HashData = fields
289 .into_iter()
290 .map(|(f, v)| (SmallBytes::from_vec(f), v))
291 .collect();
292 self.insert_loaded(key, Value::Hash(Arc::new(hash_data)), ttl_ms);
293 }
294
295 pub fn load_list(&mut self, key: Vec<u8>, items: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
296 self.insert_loaded(key, Value::List(Arc::new(items.into_iter().collect())), ttl_ms);
297 }
298
299 pub fn load_set(&mut self, key: Vec<u8>, members: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
300 let set_data: SetData = members.into_iter().map(SmallBytes::from_vec).collect();
301 self.insert_loaded(key, Value::Set(Arc::new(set_data)), ttl_ms);
302 }
303
304 pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
307 let now = now_ns();
308 let mut out = Vec::new();
309 for (k, e) in &self.map {
310 if e.is_expired_at(now) {
311 continue;
312 }
313 if let Some(p) = pattern
314 && !glob_match(p, k.as_slice())
315 {
316 continue;
317 }
318 out.push(k.to_vec());
319 if limit.is_some_and(|lim| out.len() >= lim) {
320 break;
321 }
322 }
323 out
324 }
325
326 pub fn load_zset(&mut self, key: Vec<u8>, pairs: Vec<(Vec<u8>, f64)>, ttl_ms: Option<u64>) {
327 let mut z = ZSetData::default();
328 for (m, score) in pairs {
329 z.insert(&m, score);
330 }
331 self.insert_loaded(key, Value::ZSet(Arc::new(z)), ttl_ms);
332 }
333
334 pub fn load_value(&mut self, key: &[u8], value: &Value, ttl_ms: Option<u64>) {
339 let k = key.to_vec();
340 match value {
341 Value::Str(v) => self.load_str(k, v.to_vec(), ttl_ms),
342 Value::Int(n) => {
346 self.insert_entry(
347 crate::value::SmallBytes::from_slice(&k),
348 crate::Entry::new(Value::Int(*n), ttl_ms.map(|ms| crate::deadline_at(crate::now_ns(), std::time::Duration::from_millis(ms)))),
349 );
350 }
351 Value::ArcBulk(a) => {
354 self.insert_entry(
355 crate::value::SmallBytes::from_slice(&k),
356 crate::Entry::new(Value::ArcBulk(a.clone()), ttl_ms.map(|ms| crate::deadline_at(crate::now_ns(), std::time::Duration::from_millis(ms)))),
357 );
358 }
359 Value::Hash(h) => self.load_hash(
360 k,
361 h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
362 ttl_ms,
363 ),
364 Value::SmallHashInline(h) => self.load_hash(
369 k,
370 h.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
371 ttl_ms,
372 ),
373 Value::List(l) => self.load_list(k, l.iter().cloned().collect(), ttl_ms),
374 Value::SmallListInline(l) => self.load_list(
375 k,
376 l.iter().map(<[u8]>::to_vec).collect(),
377 ttl_ms,
378 ),
379 Value::Set(s) => self.load_set(k, s.iter().map(kevy_bytes::SmallBytes::to_vec).collect(), ttl_ms),
380 Value::SmallSetInline(s) => self.load_set(
389 k,
390 s.iter_slices().map(<[u8]>::to_vec).collect(),
391 ttl_ms,
392 ),
393 Value::ZSet(z) => self.load_zset(
394 k,
395 z.ordered().map(|(m, sc)| (m.to_vec(), sc)).collect(),
396 ttl_ms,
397 ),
398 Value::SmallZSetInline(z) => self.load_zset(
399 k,
400 z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect(),
401 ttl_ms,
402 ),
403 Value::Stream(st) => {
404 let entries: Vec<crate::stream::LoadedStreamEntry> = st
405 .iter_entries()
406 .map(|(id, fv)| {
407 let fvv = fv
408 .iter()
409 .map(|(f, v)| (f.as_slice().to_vec(), v.as_slice().to_vec()))
410 .collect();
411 (id.ms, id.seq, fvv)
412 })
413 .collect();
414 let last = st.last_id();
415 let mxd = st.max_deleted_id();
416 self.load_stream(
417 k,
418 entries,
419 (last.ms, last.seq),
420 (mxd.ms, mxd.seq),
421 st.entries_added(),
422 st.export_groups(),
423 ttl_ms,
424 );
425 }
426 }
427 }
428
429 #[allow(clippy::too_many_arguments)]
435 pub fn load_stream(
436 &mut self,
437 key: Vec<u8>,
438 entries: Vec<crate::stream::LoadedStreamEntry>,
439 last_id: (u64, u64),
440 max_deleted_id: (u64, u64),
441 entries_added: u64,
442 groups: Vec<crate::stream::LoadedGroup>,
443 ttl_ms: Option<u64>,
444 ) {
445 let mut s = crate::stream::StreamData::default();
446 for (ms, seq, fv) in entries {
447 let id = crate::stream::StreamId { ms, seq };
448 let fv_small: Vec<(SmallBytes, SmallBytes)> = fv
449 .into_iter()
450 .map(|(f, v)| (SmallBytes::from_vec(f), SmallBytes::from_vec(v)))
451 .collect();
452 s.load_entry(id, fv_small);
453 }
454 s.set_loaded_state(
455 crate::stream::StreamId { ms: last_id.0, seq: last_id.1 },
456 crate::stream::StreamId { ms: max_deleted_id.0, seq: max_deleted_id.1 },
457 entries_added,
458 );
459 s.import_groups(groups);
460 self.insert_loaded(key, Value::Stream(Arc::new(s)), ttl_ms);
461 }
462}