1use std::sync::Arc;
9use std::time::Duration;
10
11use crate::value::{HashData, SetData, Value, ZSetData};
12use crate::{
13 Entry, RenameOutcome, SmallBytes, Store, deadline_at, glob_match, now_ns, pack_deadline,
14 remaining_ms,
15};
16
17impl Store {
18 pub fn del(&mut self, keys: &[Vec<u8>]) -> usize {
21 let now = now_ns();
22 let mut removed = 0;
23 for k in keys {
24 if self.reap(k, now) && self.remove_entry(k.as_slice()).is_some() {
25 removed += 1;
26 }
27 }
28 removed
29 }
30
31 pub fn exists(&mut self, keys: &[Vec<u8>]) -> usize {
32 keys.iter().filter(|k| self.live_entry(k).is_some()).count()
33 }
34
35 pub fn expire(&mut self, key: &[u8], ttl: Duration) -> bool {
36 let now = now_ns();
37 if !self.reap(key, now) {
38 return false;
39 }
40 let Some(e) = self.map.get_mut(key) else {
41 return false;
42 };
43 let had = e.expire_at_ns.is_some();
44 e.expire_at_ns = pack_deadline(deadline_at(now, ttl));
45 let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
46 self.adjust_expires(delta);
47 true
48 }
49
50 pub fn expire_at_unix_ms(&mut self, key: &[u8], deadline_ms: u64) -> bool {
60 let now = now_ns();
61 if !self.reap(key, now) || !self.map.contains_key(key) {
62 return false;
63 }
64 let wall_now = crate::now_unix_ms();
65 if deadline_ms <= wall_now {
66 self.remove_entry(key);
68 return true;
69 }
70 let remaining = Duration::from_millis(deadline_ms - wall_now);
71 if let Some(e) = self.map.get_mut(key) {
72 let had = e.expire_at_ns.is_some();
73 e.expire_at_ns = pack_deadline(deadline_at(now, remaining));
74 let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
75 self.adjust_expires(delta);
76 }
77 true
78 }
79
80 pub fn take_with_ttl(&mut self, key: &[u8]) -> Option<(Value, Option<u64>)> {
87 let now = now_ns();
88 if !self.reap(key, now) {
89 return None;
90 }
91 let entry = self.remove_entry(key)?;
92 let ttl_ms = entry.expire_at_ns.map(|ns| remaining_ms(ns, now));
93 Some((entry.value, ttl_ms))
94 }
95
96 pub fn put_with_ttl(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
102 let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
103 let entry = Entry::new(value, expire_at);
104 self.remove_entry(&key);
107 self.insert_entry(SmallBytes::from_vec(key), entry);
108 }
109
110 pub fn key_exists(&mut self, key: &[u8]) -> bool {
114 let now = now_ns();
115 self.reap(key, now) && self.map.contains_key(key)
116 }
117
118 pub fn rename(&mut self, src: &[u8], dst: &[u8], nx: bool) -> RenameOutcome {
126 let now = now_ns();
127 if !self.reap(src, now) {
128 return RenameOutcome::NoSuchSrc;
129 }
130 if src == dst {
131 return if nx {
135 RenameOutcome::DstExists
136 } else {
137 RenameOutcome::Renamed
138 };
139 }
140 if nx {
141 let dst_live = self.reap(dst, now) && self.map.contains_key(dst);
144 if dst_live {
145 return RenameOutcome::DstExists;
146 }
147 }
148 let Some(entry) = self.remove_entry(src) else {
151 return RenameOutcome::NoSuchSrc;
152 };
153 self.remove_entry(dst);
157 self.insert_entry(SmallBytes::from_vec(dst.to_vec()), entry);
158 RenameOutcome::Renamed
159 }
160
161 pub fn persist(&mut self, key: &[u8]) -> bool {
162 let now = now_ns();
163 if !self.reap(key, now) {
164 return false;
165 }
166 let cleared = match self.map.get_mut(key) {
167 Some(e) if e.expire_at_ns.is_some() => {
168 e.expire_at_ns = None;
169 true
170 }
171 _ => false,
172 };
173 if cleared {
174 self.adjust_expires(-1);
175 }
176 cleared
177 }
178
179 pub fn pttl(&mut self, key: &[u8]) -> i64 {
181 let now = now_ns();
182 if !self.reap(key, now) {
183 return -2;
184 }
185 match self.map.get(key).and_then(|e| e.expire_at_ns) {
186 None => -1,
187 Some(ns) => remaining_ms(ns, now) as i64,
188 }
189 }
190
191 pub fn type_of(&mut self, key: &[u8]) -> &'static str {
192 let now = now_ns();
193 if !self.reap(key, now) {
194 return "none";
195 }
196 self.map.get(key).map_or("none", |e| e.value.type_name())
197 }
198
199 pub fn dbsize(&self) -> usize {
200 self.map.len()
201 }
202
203 pub fn flushall(&mut self) {
211 self.map.clear();
212 self.used_memory = 0;
213 self.expires = 0;
214 }
216
217 #[deprecated(
220 since = "1.17.0",
221 note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the keyspace"
222 )]
223 pub fn flush(&mut self) {
224 self.flushall();
225 }
226
227 pub fn ttl_pending_count(&self) -> usize {
232 let now = now_ns();
233 self.map
234 .values()
235 .filter(|e| e.expire_at_ns.is_some() && !e.is_expired_at(now))
236 .count()
237 }
238
239 pub fn snapshot_each<F: FnMut(&[u8], &Value, Option<u64>)>(&self, mut f: F) {
243 let now = now_ns();
244 for (k, e) in &self.map {
245 if e.is_expired_at(now) {
246 continue;
247 }
248 let ttl = e.expire_at_ns.map(|ns| remaining_ms(ns, now));
249 f(k.as_slice(), &e.value, ttl);
250 }
251 }
252
253 fn insert_loaded(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
254 let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
255 self.insert_entry(SmallBytes::from_vec(key), Entry::new(value, expire_at));
256 }
257
258 pub fn load_str(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: Option<u64>) {
259 self.insert_loaded(key, Value::Str(SmallBytes::from_vec(value)), ttl_ms);
260 }
261
262 pub fn load_hash(
263 &mut self,
264 key: Vec<u8>,
265 fields: Vec<(Vec<u8>, Vec<u8>)>,
266 ttl_ms: Option<u64>,
267 ) {
268 let hash_data: HashData = fields
270 .into_iter()
271 .map(|(f, v)| (SmallBytes::from_vec(f), v))
272 .collect();
273 self.insert_loaded(key, Value::Hash(Arc::new(hash_data)), ttl_ms);
274 }
275
276 pub fn load_list(&mut self, key: Vec<u8>, items: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
277 self.insert_loaded(key, Value::List(Arc::new(items.into_iter().collect())), ttl_ms);
278 }
279
280 pub fn load_set(&mut self, key: Vec<u8>, members: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
281 let set_data: SetData = members.into_iter().map(SmallBytes::from_vec).collect();
282 self.insert_loaded(key, Value::Set(Arc::new(set_data)), ttl_ms);
283 }
284
285 pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
288 let now = now_ns();
289 let mut out = Vec::new();
290 for (k, e) in &self.map {
291 if e.is_expired_at(now) {
292 continue;
293 }
294 if let Some(p) = pattern
295 && !glob_match(p, k.as_slice())
296 {
297 continue;
298 }
299 out.push(k.to_vec());
300 if limit.is_some_and(|lim| out.len() >= lim) {
301 break;
302 }
303 }
304 out
305 }
306
307 pub fn load_zset(&mut self, key: Vec<u8>, pairs: Vec<(Vec<u8>, f64)>, ttl_ms: Option<u64>) {
308 let mut z = ZSetData::default();
309 for (m, score) in pairs {
310 z.insert(&m, score);
311 }
312 self.insert_loaded(key, Value::ZSet(Arc::new(z)), ttl_ms);
313 }
314
315 pub fn load_value(&mut self, key: &[u8], value: &Value, ttl_ms: Option<u64>) {
320 let k = key.to_vec();
321 match value {
322 Value::Str(v) => self.load_str(k, v.to_vec(), ttl_ms),
323 Value::Hash(h) => self.load_hash(
324 k,
325 h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
326 ttl_ms,
327 ),
328 Value::List(l) => self.load_list(k, l.iter().cloned().collect(), ttl_ms),
329 Value::Set(s) => self.load_set(k, s.iter().map(kevy_bytes::SmallBytes::to_vec).collect(), ttl_ms),
330 Value::ZSet(z) => self.load_zset(
331 k,
332 z.ordered().map(|(m, sc)| (m.to_vec(), sc)).collect(),
333 ttl_ms,
334 ),
335 Value::Stream(st) => {
336 let entries: Vec<crate::stream::LoadedStreamEntry> = st
337 .iter_entries()
338 .map(|(id, fv)| {
339 let fvv = fv
340 .iter()
341 .map(|(f, v)| (f.as_slice().to_vec(), v.as_slice().to_vec()))
342 .collect();
343 (id.ms, id.seq, fvv)
344 })
345 .collect();
346 let last = st.last_id();
347 let mxd = st.max_deleted_id();
348 self.load_stream(
349 k,
350 entries,
351 (last.ms, last.seq),
352 (mxd.ms, mxd.seq),
353 st.entries_added(),
354 st.export_groups(),
355 ttl_ms,
356 );
357 }
358 }
359 }
360
361 #[allow(clippy::too_many_arguments)]
367 pub fn load_stream(
368 &mut self,
369 key: Vec<u8>,
370 entries: Vec<crate::stream::LoadedStreamEntry>,
371 last_id: (u64, u64),
372 max_deleted_id: (u64, u64),
373 entries_added: u64,
374 groups: Vec<crate::stream::LoadedGroup>,
375 ttl_ms: Option<u64>,
376 ) {
377 let mut s = crate::stream::StreamData::default();
378 for (ms, seq, fv) in entries {
379 let id = crate::stream::StreamId { ms, seq };
380 let fv_small: Vec<(SmallBytes, SmallBytes)> = fv
381 .into_iter()
382 .map(|(f, v)| (SmallBytes::from_vec(f), SmallBytes::from_vec(v)))
383 .collect();
384 s.load_entry(id, fv_small);
385 }
386 s.set_loaded_state(
387 crate::stream::StreamId { ms: last_id.0, seq: last_id.1 },
388 crate::stream::StreamId { ms: max_deleted_id.0, seq: max_deleted_id.1 },
389 entries_added,
390 );
391 s.import_groups(groups);
392 self.insert_loaded(key, Value::Stream(Arc::new(s)), ttl_ms);
393 }
394}