Skip to main content

kevy_store/
keyspace.rs

1//! Generic key operations + persistence hooks on [`Store`]:
2//! `del`/`exists`/`expire`/`persist`/`pttl`/`type_of`/`dbsize`/`flush`/
3//! `snapshot_each`/`load_*`/`collect_keys`. Type-agnostic; typed accessors
4//! live in the per-type modules (string/hash/list/set/zset).
5//!
6//! Split out of [`crate`] for file-size hygiene.
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::value::{HashData, SetData, Value, ZSetData};
12use crate::{
13    Entry, RenameOutcome, SmallBytes, Store, deadline_at, glob_match, now_ns, pack_deadline,
14    remaining_ms,
15};
16
17impl Store {
18    // ---- generic key ops (type-agnostic) -------------------------------
19
20    pub fn del(&mut self, keys: &[Vec<u8>]) -> usize {
21        let now = now_ns();
22        let mut removed = 0;
23        for k in keys {
24            if self.reap(k, now) && self.remove_entry(k.as_slice()).is_some() {
25                removed += 1;
26            }
27        }
28        removed
29    }
30
31    /// G4 (v1.25): borrowed-slice `DEL` — kills the per-key `Vec<u8>` alloc
32    /// the dispatch layer used to do via `rest(args, 1)`. Behaviour identical
33    /// to [`Self::del`].
34    pub fn del_borrowed(&mut self, keys: &[&[u8]]) -> usize {
35        let now = now_ns();
36        let mut removed = 0;
37        for k in keys {
38            if self.reap(k, now) && self.remove_entry(k).is_some() {
39                removed += 1;
40            }
41        }
42        removed
43    }
44
45    pub fn exists(&mut self, keys: &[Vec<u8>]) -> usize {
46        keys.iter().filter(|k| self.live_entry(k).is_some()).count()
47    }
48
49    /// G4 (v1.25): borrowed-slice `EXISTS` — see [`Self::del_borrowed`].
50    pub fn exists_borrowed(&mut self, keys: &[&[u8]]) -> usize {
51        keys.iter().filter(|k| self.live_entry(k).is_some()).count()
52    }
53
54    pub fn expire(&mut self, key: &[u8], ttl: Duration) -> bool {
55        let now = now_ns();
56        if !self.reap(key, now) {
57            return false;
58        }
59        let Some(e) = self.map.get_mut(key) else {
60            return false;
61        };
62        let had = e.expire_at_ns.is_some();
63        e.expire_at_ns = pack_deadline(deadline_at(now, ttl));
64        let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
65        self.adjust_expires(delta);
66        true
67    }
68
69    /// `EXPIREAT`/`PEXPIREAT` semantics: set an **absolute** wall-clock
70    /// deadline (Unix epoch millis). This is the persistence-safe form —
71    /// a deadline survives restart unchanged, unlike the relative
72    /// [`Self::expire`] (whose duration is re-anchored to "now"). A
73    /// deadline already in the past deletes the key immediately (Redis
74    /// behaviour). Returns `true` iff the key existed (and was either
75    /// re-dated or deleted). The wall-clock → monotonic-`Instant`
76    /// conversion happens here so callers persist absolute time but the
77    /// hot path keeps its cheap monotonic deadline.
78    pub fn expire_at_unix_ms(&mut self, key: &[u8], deadline_ms: u64) -> bool {
79        let now = now_ns();
80        if !self.reap(key, now) || !self.map.contains_key(key) {
81            return false;
82        }
83        let wall_now = crate::now_unix_ms();
84        if deadline_ms <= wall_now {
85            // Past deadline: delete now, just like Redis EXPIREAT in the past.
86            self.remove_entry(key);
87            return true;
88        }
89        let remaining = Duration::from_millis(deadline_ms - wall_now);
90        if let Some(e) = self.map.get_mut(key) {
91            let had = e.expire_at_ns.is_some();
92            e.expire_at_ns = pack_deadline(deadline_at(now, remaining));
93            let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
94            self.adjust_expires(delta);
95        }
96        true
97    }
98
99    /// Cross-shard RENAME step 1: atomically remove the entry at
100    /// `key` (if any), returning the `(value, ttl_ms_remaining)`. The
101    /// orchestrator on the origin shard ships the result into a
102    /// follow-up [`Self::put_with_ttl`] on the destination shard.
103    /// Lazy-reaps an expired entry before the take (so an expired
104    /// key is observed as `None`, not silently rehomed).
105    pub fn take_with_ttl(&mut self, key: &[u8]) -> Option<(Value, Option<u64>)> {
106        let now = now_ns();
107        if !self.reap(key, now) {
108            return None;
109        }
110        let entry = self.remove_entry(key)?;
111        let ttl_ms = entry.expire_at_ns.map(|ns| remaining_ms(ns, now));
112        Some((entry.value, ttl_ms))
113    }
114
115    /// Cross-shard RENAME step 2: write `value` at `key` on this
116    /// shard, overwriting any prior entry. `ttl_ms` is set as a TTL
117    /// relative to *now* (i.e. the orchestrator should have computed
118    /// the remaining TTL on the source shard via `take_with_ttl` and
119    /// is shipping that exact remaining value here).
120    pub fn put_with_ttl(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
121        let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
122        let entry = Entry::new(value, expire_at);
123        // Overwrite — drop any existing entry first so the accounting
124        // doesn't double-count.
125        self.remove_entry(&key);
126        self.insert_entry(SmallBytes::from_vec(key), entry);
127    }
128
129    /// Whether a live (non-expired) entry exists at `key`. Reaps an
130    /// expired entry as a side effect. Used by the cross-shard RENAME
131    /// orchestrator's `nx` pre-check.
132    pub fn key_exists(&mut self, key: &[u8]) -> bool {
133        let now = now_ns();
134        self.reap(key, now) && self.map.contains_key(key)
135    }
136
137    /// `RENAME` (or `RENAMENX` if `nx`). Atomic on this shard. Returns
138    /// the outcome so the dispatch layer can emit the right RESP frame
139    /// (RENAME: `+OK` or `-ERR no such key`; RENAMENX: `:1`/`:0`/error).
140    ///
141    /// Cross-shard rename is the runtime's job — by the time this is
142    /// called, both `src` and `dst` are guaranteed to live on the same
143    /// shard. See `kevy-rt::start_rename` for the cross-shard split.
144    pub fn rename(&mut self, src: &[u8], dst: &[u8], nx: bool) -> RenameOutcome {
145        let now = now_ns();
146        if !self.reap(src, now) {
147            return RenameOutcome::NoSuchSrc;
148        }
149        if src == dst {
150            // Redis 6+ semantics: same-key rename is a no-op `+OK`.
151            // (RENAMENX same-key returns `:0` per Redis since dst
152            // technically already exists at src's address.)
153            return if nx {
154                RenameOutcome::DstExists
155            } else {
156                RenameOutcome::Renamed
157            };
158        }
159        if nx {
160            // Reap dst before the existence test so a TTL-expired dst
161            // doesn't block the rename.
162            let dst_live = self.reap(dst, now) && self.map.contains_key(dst);
163            if dst_live {
164                return RenameOutcome::DstExists;
165            }
166        }
167        // Take src's entry out. `remove_entry` returns the full Entry
168        // (value + TTL) — preserves TTL across rename, matching Redis.
169        let Some(entry) = self.remove_entry(src) else {
170            return RenameOutcome::NoSuchSrc;
171        };
172        // Drop any pre-existing dst (overwrite semantics). reap above
173        // already handled TTL-expired dst, but the live-dst case still
174        // needs removal.
175        self.remove_entry(dst);
176        self.insert_entry(SmallBytes::from_vec(dst.to_vec()), entry);
177        RenameOutcome::Renamed
178    }
179
180    pub fn persist(&mut self, key: &[u8]) -> bool {
181        let now = now_ns();
182        if !self.reap(key, now) {
183            return false;
184        }
185        let cleared = match self.map.get_mut(key) {
186            Some(e) if e.expire_at_ns.is_some() => {
187                e.expire_at_ns = None;
188                true
189            }
190            _ => false,
191        };
192        if cleared {
193            self.adjust_expires(-1);
194        }
195        cleared
196    }
197
198    /// Remaining TTL in ms: `-2` no key, `-1` no expiry, else `>= 0`.
199    pub fn pttl(&mut self, key: &[u8]) -> i64 {
200        let now = now_ns();
201        if !self.reap(key, now) {
202            return -2;
203        }
204        match self.map.get(key).and_then(|e| e.expire_at_ns) {
205            None => -1,
206            Some(ns) => remaining_ms(ns, now) as i64,
207        }
208    }
209
210    pub fn type_of(&mut self, key: &[u8]) -> &'static str {
211        let now = now_ns();
212        if !self.reap(key, now) {
213            return "none";
214        }
215        self.map.get(key).map_or("none", |e| e.value.type_name())
216    }
217
218    pub fn dbsize(&self) -> usize {
219        self.map.len()
220    }
221
222    /// Wipe every key in this shard's keyspace (the `FLUSHALL`/`FLUSHDB`
223    /// primitive). Resets `used_memory`; `used_memory_peak` is
224    /// lifetime-cumulative and intentionally not reset.
225    ///
226    /// Named `flushall` — **not** `flush` — to avoid colliding with
227    /// `Write::flush`'s "sync buffered writes to disk" meaning. This method
228    /// DESTROYS data; it does not persist it.
229    pub fn flushall(&mut self) {
230        self.map.clear();
231        self.used_memory = 0;
232        self.expires = 0;
233        // peak is lifetime-cumulative; intentionally not reset.
234    }
235
236    /// Deprecated alias for [`Self::flushall`]. The old name read like
237    /// `Write::flush` (sync-to-disk) but actually WIPES the keyspace.
238    #[deprecated(
239        since = "1.17.0",
240        note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the keyspace"
241    )]
242    pub fn flush(&mut self) {
243        self.flushall();
244    }
245
246    /// Count live (non-expired) keys that carry a TTL — the size of the
247    /// "expire set" Redis tracks. Useful as an introspection signal for
248    /// confirming the TTL subsystem actually registered keys. O(n) over the
249    /// keyspace; call it for diagnostics, not on the hot path.
250    pub fn ttl_pending_count(&self) -> usize {
251        let now = now_ns();
252        self.map
253            .values()
254            .filter(|e| e.expire_at_ns.is_some() && !e.is_expired_at(now))
255            .count()
256    }
257
258    // ---- persistence hooks ---------------------------------------------
259
260    /// Visit every live entry as `(key, &value, ttl_ms)` for snapshotting.
261    pub fn snapshot_each<F: FnMut(&[u8], &Value, Option<u64>)>(&self, mut f: F) {
262        let now = now_ns();
263        for (k, e) in &self.map {
264            if e.is_expired_at(now) {
265                continue;
266            }
267            let ttl = e.expire_at_ns.map(|ns| remaining_ms(ns, now));
268            f(k.as_slice(), &e.value, ttl);
269        }
270    }
271
272    fn insert_loaded(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
273        let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
274        self.insert_entry(SmallBytes::from_vec(key), Entry::new(value, expire_at));
275    }
276
277    pub fn load_str(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: Option<u64>) {
278        self.insert_loaded(key, Value::Str(SmallBytes::from_vec(value)), ttl_ms);
279    }
280
281    pub fn load_hash(
282        &mut self,
283        key: Vec<u8>,
284        fields: Vec<(Vec<u8>, Vec<u8>)>,
285        ttl_ms: Option<u64>,
286    ) {
287        // Hash keys are SmallBytes; values stay Vec<u8>. From-iter converts.
288        let hash_data: HashData = fields
289            .into_iter()
290            .map(|(f, v)| (SmallBytes::from_vec(f), v))
291            .collect();
292        self.insert_loaded(key, Value::Hash(Arc::new(hash_data)), ttl_ms);
293    }
294
295    pub fn load_list(&mut self, key: Vec<u8>, items: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
296        self.insert_loaded(key, Value::List(Arc::new(items.into_iter().collect())), ttl_ms);
297    }
298
299    pub fn load_set(&mut self, key: Vec<u8>, members: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
300        let set_data: SetData = members.into_iter().map(SmallBytes::from_vec).collect();
301        self.insert_loaded(key, Value::Set(Arc::new(set_data)), ttl_ms);
302    }
303
304    /// Collect live keys (optionally matching a glob `pattern`, up to `limit`).
305    /// Used by KEYS/SCAN/RANDOMKEY. Treats expired keys as absent (no removal).
306    pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
307        let now = now_ns();
308        let mut out = Vec::new();
309        for (k, e) in &self.map {
310            if e.is_expired_at(now) {
311                continue;
312            }
313            if let Some(p) = pattern
314                && !glob_match(p, k.as_slice())
315            {
316                continue;
317            }
318            out.push(k.to_vec());
319            if limit.is_some_and(|lim| out.len() >= lim) {
320                break;
321            }
322        }
323        out
324    }
325
326    pub fn load_zset(&mut self, key: Vec<u8>, pairs: Vec<(Vec<u8>, f64)>, ttl_ms: Option<u64>) {
327        let mut z = ZSetData::default();
328        for (m, score) in pairs {
329            z.insert(&m, score);
330        }
331        self.insert_loaded(key, Value::ZSet(Arc::new(z)), ttl_ms);
332    }
333
334    /// Insert one already-typed `(key, value, ttl)` triple, e.g. straight out
335    /// of another store's [`Self::snapshot_each`] — the redistribution step
336    /// both reshard paths (embedded `shards` bring-up, server routing
337    /// migration) use to re-home keys after a layout change.
338    pub fn load_value(&mut self, key: &[u8], value: &Value, ttl_ms: Option<u64>) {
339        let k = key.to_vec();
340        match value {
341            Value::Str(v) => self.load_str(k, v.to_vec(), ttl_ms),
342            // L2: snapshot/replication load keeps the encoding — store as
343            // Int directly to preserve the in-memory shape (and avoid the
344            // SET-detect parse on the load path).
345            Value::Int(n) => {
346                self.insert_entry(
347                    crate::value::SmallBytes::from_slice(&k),
348                    crate::Entry::new(Value::Int(*n), ttl_ms.map(|ms| crate::deadline_at(crate::now_ns(), std::time::Duration::from_millis(ms)))),
349                );
350            }
351            // L1: preserve the Arc-backed encoding on snapshot/replication
352            // load. Arc::clone is cheap; avoids re-copying the bytes.
353            Value::ArcBulk(a) => {
354                self.insert_entry(
355                    crate::value::SmallBytes::from_slice(&k),
356                    crate::Entry::new(Value::ArcBulk(a.clone()), ttl_ms.map(|ms| crate::deadline_at(crate::now_ns(), std::time::Duration::from_millis(ms)))),
357                );
358            }
359            Value::Hash(h) => self.load_hash(
360                k,
361                h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
362                ttl_ms,
363            ),
364            // A.8: same shape as A.7 — re-materialise to the heap-backed
365            // variant on snapshot/replication load. First mutation that
366            // targets the key will go through the encoding-switch path
367            // and (if size still fits) re-promote to the inline variant.
368            Value::SmallHashInline(h) => self.load_hash(
369                k,
370                h.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
371                ttl_ms,
372            ),
373            Value::List(l) => self.load_list(k, l.iter().cloned().collect(), ttl_ms),
374            Value::SmallListInline(l) => self.load_list(
375                k,
376                l.iter().map(<[u8]>::to_vec).collect(),
377                ttl_ms,
378            ),
379            Value::Set(s) => self.load_set(k, s.iter().map(kevy_bytes::SmallBytes::to_vec).collect(), ttl_ms),
380            // A.7 O5: snapshot/replication load — re-materialise the
381            // inline-encoded set as a `Value::Set`-backed `KevySet`. We
382            // don't preserve the SmallSetInline encoding on reload
383            // because (a) the upgrade path will naturally rebuild it on
384            // the first SADD that targets the key, and (b) the snapshot
385            // wire format already uses the OP_SET length-prefixed
386            // payload — losing the inline encoding bit costs nothing
387            // beyond one re-promotion on the first mutation.
388            Value::SmallSetInline(s) => self.load_set(
389                k,
390                s.iter_slices().map(<[u8]>::to_vec).collect(),
391                ttl_ms,
392            ),
393            Value::ZSet(z) => self.load_zset(
394                k,
395                z.ordered().map(|(m, sc)| (m.to_vec(), sc)).collect(),
396                ttl_ms,
397            ),
398            Value::SmallZSetInline(z) => self.load_zset(
399                k,
400                z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect(),
401                ttl_ms,
402            ),
403            Value::Stream(st) => {
404                let entries: Vec<crate::stream::LoadedStreamEntry> = st
405                    .iter_entries()
406                    .map(|(id, fv)| {
407                        let fvv = fv
408                            .iter()
409                            .map(|(f, v)| (f.as_slice().to_vec(), v.as_slice().to_vec()))
410                            .collect();
411                        (id.ms, id.seq, fvv)
412                    })
413                    .collect();
414                let last = st.last_id();
415                let mxd = st.max_deleted_id();
416                self.load_stream(
417                    k,
418                    entries,
419                    (last.ms, last.seq),
420                    (mxd.ms, mxd.seq),
421                    st.entries_added(),
422                    st.export_groups(),
423                    ttl_ms,
424                );
425            }
426        }
427    }
428
429    /// Snapshot-load a stream: every entry plus the per-stream scalar
430    /// state (last_id, max_deleted_id, entries_added) and the consumer
431    /// groups are restored verbatim. Caller passes already-decoded
432    /// primitive tuples; this fn does the [`SmallBytes`] /
433    /// [`crate::StreamData`] conversion.
434    #[allow(clippy::too_many_arguments)]
435    pub fn load_stream(
436        &mut self,
437        key: Vec<u8>,
438        entries: Vec<crate::stream::LoadedStreamEntry>,
439        last_id: (u64, u64),
440        max_deleted_id: (u64, u64),
441        entries_added: u64,
442        groups: Vec<crate::stream::LoadedGroup>,
443        ttl_ms: Option<u64>,
444    ) {
445        let mut s = crate::stream::StreamData::default();
446        for (ms, seq, fv) in entries {
447            let id = crate::stream::StreamId { ms, seq };
448            let fv_small: Vec<(SmallBytes, SmallBytes)> = fv
449                .into_iter()
450                .map(|(f, v)| (SmallBytes::from_vec(f), SmallBytes::from_vec(v)))
451                .collect();
452            s.load_entry(id, fv_small);
453        }
454        s.set_loaded_state(
455            crate::stream::StreamId { ms: last_id.0, seq: last_id.1 },
456            crate::stream::StreamId { ms: max_deleted_id.0, seq: max_deleted_id.1 },
457            entries_added,
458        );
459        s.import_groups(groups);
460        self.insert_loaded(key, Value::Stream(Arc::new(s)), ttl_ms);
461    }
462}