Skip to main content

kevy_store/
keyspace.rs

1//! Generic key operations + persistence hooks on [`Store`]:
2//! `del`/`exists`/`expire`/`persist`/`pttl`/`type_of`/`dbsize`/`flush`/
3//! `snapshot_each`/`load_*`/`collect_keys`. Type-agnostic; typed accessors
4//! live in the per-type modules (string/hash/list/set/zset).
5//!
6//! Split out of [`crate`] for file-size hygiene.
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::value::{HashData, SetData, Value, ZSetData};
12use crate::{
13    Entry, RenameOutcome, SmallBytes, Store, deadline_at, glob_match, now_ns, pack_deadline,
14    remaining_ms,
15};
16
17impl Store {
18    // ---- generic key ops (type-agnostic) -------------------------------
19
20    pub fn del(&mut self, keys: &[Vec<u8>]) -> usize {
21        let now = now_ns();
22        let mut removed = 0;
23        for k in keys {
24            if self.reap(k, now) && self.remove_entry(k.as_slice()).is_some() {
25                removed += 1;
26            }
27        }
28        removed
29    }
30
31    pub fn exists(&mut self, keys: &[Vec<u8>]) -> usize {
32        keys.iter().filter(|k| self.live_entry(k).is_some()).count()
33    }
34
35    pub fn expire(&mut self, key: &[u8], ttl: Duration) -> bool {
36        let now = now_ns();
37        if !self.reap(key, now) {
38            return false;
39        }
40        let Some(e) = self.map.get_mut(key) else {
41            return false;
42        };
43        let had = e.expire_at_ns.is_some();
44        e.expire_at_ns = pack_deadline(deadline_at(now, ttl));
45        let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
46        self.adjust_expires(delta);
47        true
48    }
49
50    /// `EXPIREAT`/`PEXPIREAT` semantics: set an **absolute** wall-clock
51    /// deadline (Unix epoch millis). This is the persistence-safe form —
52    /// a deadline survives restart unchanged, unlike the relative
53    /// [`Self::expire`] (whose duration is re-anchored to "now"). A
54    /// deadline already in the past deletes the key immediately (Redis
55    /// behaviour). Returns `true` iff the key existed (and was either
56    /// re-dated or deleted). The wall-clock → monotonic-`Instant`
57    /// conversion happens here so callers persist absolute time but the
58    /// hot path keeps its cheap monotonic deadline.
59    pub fn expire_at_unix_ms(&mut self, key: &[u8], deadline_ms: u64) -> bool {
60        let now = now_ns();
61        if !self.reap(key, now) || !self.map.contains_key(key) {
62            return false;
63        }
64        let wall_now = crate::now_unix_ms();
65        if deadline_ms <= wall_now {
66            // Past deadline: delete now, just like Redis EXPIREAT in the past.
67            self.remove_entry(key);
68            return true;
69        }
70        let remaining = Duration::from_millis(deadline_ms - wall_now);
71        if let Some(e) = self.map.get_mut(key) {
72            let had = e.expire_at_ns.is_some();
73            e.expire_at_ns = pack_deadline(deadline_at(now, remaining));
74            let delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had);
75            self.adjust_expires(delta);
76        }
77        true
78    }
79
80    /// Cross-shard RENAME step 1: atomically remove the entry at
81    /// `key` (if any), returning the `(value, ttl_ms_remaining)`. The
82    /// orchestrator on the origin shard ships the result into a
83    /// follow-up [`Self::put_with_ttl`] on the destination shard.
84    /// Lazy-reaps an expired entry before the take (so an expired
85    /// key is observed as `None`, not silently rehomed).
86    pub fn take_with_ttl(&mut self, key: &[u8]) -> Option<(Value, Option<u64>)> {
87        let now = now_ns();
88        if !self.reap(key, now) {
89            return None;
90        }
91        let entry = self.remove_entry(key)?;
92        let ttl_ms = entry.expire_at_ns.map(|ns| remaining_ms(ns, now));
93        Some((entry.value, ttl_ms))
94    }
95
96    /// Cross-shard RENAME step 2: write `value` at `key` on this
97    /// shard, overwriting any prior entry. `ttl_ms` is set as a TTL
98    /// relative to *now* (i.e. the orchestrator should have computed
99    /// the remaining TTL on the source shard via `take_with_ttl` and
100    /// is shipping that exact remaining value here).
101    pub fn put_with_ttl(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
102        let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
103        let entry = Entry::new(value, expire_at);
104        // Overwrite — drop any existing entry first so the accounting
105        // doesn't double-count.
106        self.remove_entry(&key);
107        self.insert_entry(SmallBytes::from_vec(key), entry);
108    }
109
110    /// Whether a live (non-expired) entry exists at `key`. Reaps an
111    /// expired entry as a side effect. Used by the cross-shard RENAME
112    /// orchestrator's `nx` pre-check.
113    pub fn key_exists(&mut self, key: &[u8]) -> bool {
114        let now = now_ns();
115        self.reap(key, now) && self.map.contains_key(key)
116    }
117
118    /// `RENAME` (or `RENAMENX` if `nx`). Atomic on this shard. Returns
119    /// the outcome so the dispatch layer can emit the right RESP frame
120    /// (RENAME: `+OK` or `-ERR no such key`; RENAMENX: `:1`/`:0`/error).
121    ///
122    /// Cross-shard rename is the runtime's job — by the time this is
123    /// called, both `src` and `dst` are guaranteed to live on the same
124    /// shard. See `kevy-rt::start_rename` for the cross-shard split.
125    pub fn rename(&mut self, src: &[u8], dst: &[u8], nx: bool) -> RenameOutcome {
126        let now = now_ns();
127        if !self.reap(src, now) {
128            return RenameOutcome::NoSuchSrc;
129        }
130        if src == dst {
131            // Redis 6+ semantics: same-key rename is a no-op `+OK`.
132            // (RENAMENX same-key returns `:0` per Redis since dst
133            // technically already exists at src's address.)
134            return if nx {
135                RenameOutcome::DstExists
136            } else {
137                RenameOutcome::Renamed
138            };
139        }
140        if nx {
141            // Reap dst before the existence test so a TTL-expired dst
142            // doesn't block the rename.
143            let dst_live = self.reap(dst, now) && self.map.contains_key(dst);
144            if dst_live {
145                return RenameOutcome::DstExists;
146            }
147        }
148        // Take src's entry out. `remove_entry` returns the full Entry
149        // (value + TTL) — preserves TTL across rename, matching Redis.
150        let Some(entry) = self.remove_entry(src) else {
151            return RenameOutcome::NoSuchSrc;
152        };
153        // Drop any pre-existing dst (overwrite semantics). reap above
154        // already handled TTL-expired dst, but the live-dst case still
155        // needs removal.
156        self.remove_entry(dst);
157        self.insert_entry(SmallBytes::from_vec(dst.to_vec()), entry);
158        RenameOutcome::Renamed
159    }
160
161    pub fn persist(&mut self, key: &[u8]) -> bool {
162        let now = now_ns();
163        if !self.reap(key, now) {
164            return false;
165        }
166        let cleared = match self.map.get_mut(key) {
167            Some(e) if e.expire_at_ns.is_some() => {
168                e.expire_at_ns = None;
169                true
170            }
171            _ => false,
172        };
173        if cleared {
174            self.adjust_expires(-1);
175        }
176        cleared
177    }
178
179    /// Remaining TTL in ms: `-2` no key, `-1` no expiry, else `>= 0`.
180    pub fn pttl(&mut self, key: &[u8]) -> i64 {
181        let now = now_ns();
182        if !self.reap(key, now) {
183            return -2;
184        }
185        match self.map.get(key).and_then(|e| e.expire_at_ns) {
186            None => -1,
187            Some(ns) => remaining_ms(ns, now) as i64,
188        }
189    }
190
191    pub fn type_of(&mut self, key: &[u8]) -> &'static str {
192        let now = now_ns();
193        if !self.reap(key, now) {
194            return "none";
195        }
196        self.map.get(key).map_or("none", |e| e.value.type_name())
197    }
198
199    pub fn dbsize(&self) -> usize {
200        self.map.len()
201    }
202
203    /// Wipe every key in this shard's keyspace (the `FLUSHALL`/`FLUSHDB`
204    /// primitive). Resets `used_memory`; `used_memory_peak` is
205    /// lifetime-cumulative and intentionally not reset.
206    ///
207    /// Named `flushall` — **not** `flush` — to avoid colliding with
208    /// `Write::flush`'s "sync buffered writes to disk" meaning. This method
209    /// DESTROYS data; it does not persist it.
210    pub fn flushall(&mut self) {
211        self.map.clear();
212        self.used_memory = 0;
213        self.expires = 0;
214        // peak is lifetime-cumulative; intentionally not reset.
215    }
216
217    /// Deprecated alias for [`Self::flushall`]. The old name read like
218    /// `Write::flush` (sync-to-disk) but actually WIPES the keyspace.
219    #[deprecated(
220        since = "1.17.0",
221        note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the keyspace"
222    )]
223    pub fn flush(&mut self) {
224        self.flushall();
225    }
226
227    /// Count live (non-expired) keys that carry a TTL — the size of the
228    /// "expire set" Redis tracks. Useful as an introspection signal for
229    /// confirming the TTL subsystem actually registered keys. O(n) over the
230    /// keyspace; call it for diagnostics, not on the hot path.
231    pub fn ttl_pending_count(&self) -> usize {
232        let now = now_ns();
233        self.map
234            .values()
235            .filter(|e| e.expire_at_ns.is_some() && !e.is_expired_at(now))
236            .count()
237    }
238
239    // ---- persistence hooks ---------------------------------------------
240
241    /// Visit every live entry as `(key, &value, ttl_ms)` for snapshotting.
242    pub fn snapshot_each<F: FnMut(&[u8], &Value, Option<u64>)>(&self, mut f: F) {
243        let now = now_ns();
244        for (k, e) in &self.map {
245            if e.is_expired_at(now) {
246                continue;
247            }
248            let ttl = e.expire_at_ns.map(|ns| remaining_ms(ns, now));
249            f(k.as_slice(), &e.value, ttl);
250        }
251    }
252
253    fn insert_loaded(&mut self, key: Vec<u8>, value: Value, ttl_ms: Option<u64>) {
254        let expire_at = ttl_ms.map(|ms| deadline_at(now_ns(), Duration::from_millis(ms)));
255        self.insert_entry(SmallBytes::from_vec(key), Entry::new(value, expire_at));
256    }
257
258    pub fn load_str(&mut self, key: Vec<u8>, value: Vec<u8>, ttl_ms: Option<u64>) {
259        self.insert_loaded(key, Value::Str(SmallBytes::from_vec(value)), ttl_ms);
260    }
261
262    pub fn load_hash(
263        &mut self,
264        key: Vec<u8>,
265        fields: Vec<(Vec<u8>, Vec<u8>)>,
266        ttl_ms: Option<u64>,
267    ) {
268        // Hash keys are SmallBytes; values stay Vec<u8>. From-iter converts.
269        let hash_data: HashData = fields
270            .into_iter()
271            .map(|(f, v)| (SmallBytes::from_vec(f), v))
272            .collect();
273        self.insert_loaded(key, Value::Hash(Arc::new(hash_data)), ttl_ms);
274    }
275
276    pub fn load_list(&mut self, key: Vec<u8>, items: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
277        self.insert_loaded(key, Value::List(Arc::new(items.into_iter().collect())), ttl_ms);
278    }
279
280    pub fn load_set(&mut self, key: Vec<u8>, members: Vec<Vec<u8>>, ttl_ms: Option<u64>) {
281        let set_data: SetData = members.into_iter().map(SmallBytes::from_vec).collect();
282        self.insert_loaded(key, Value::Set(Arc::new(set_data)), ttl_ms);
283    }
284
285    /// Collect live keys (optionally matching a glob `pattern`, up to `limit`).
286    /// Used by KEYS/SCAN/RANDOMKEY. Treats expired keys as absent (no removal).
287    pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
288        let now = now_ns();
289        let mut out = Vec::new();
290        for (k, e) in &self.map {
291            if e.is_expired_at(now) {
292                continue;
293            }
294            if let Some(p) = pattern
295                && !glob_match(p, k.as_slice())
296            {
297                continue;
298            }
299            out.push(k.to_vec());
300            if limit.is_some_and(|lim| out.len() >= lim) {
301                break;
302            }
303        }
304        out
305    }
306
307    pub fn load_zset(&mut self, key: Vec<u8>, pairs: Vec<(Vec<u8>, f64)>, ttl_ms: Option<u64>) {
308        let mut z = ZSetData::default();
309        for (m, score) in pairs {
310            z.insert(&m, score);
311        }
312        self.insert_loaded(key, Value::ZSet(Arc::new(z)), ttl_ms);
313    }
314
315    /// Insert one already-typed `(key, value, ttl)` triple, e.g. straight out
316    /// of another store's [`Self::snapshot_each`] — the redistribution step
317    /// both reshard paths (embedded `shards` bring-up, server routing
318    /// migration) use to re-home keys after a layout change.
319    pub fn load_value(&mut self, key: &[u8], value: &Value, ttl_ms: Option<u64>) {
320        let k = key.to_vec();
321        match value {
322            Value::Str(v) => self.load_str(k, v.to_vec(), ttl_ms),
323            Value::Hash(h) => self.load_hash(
324                k,
325                h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
326                ttl_ms,
327            ),
328            Value::List(l) => self.load_list(k, l.iter().cloned().collect(), ttl_ms),
329            Value::Set(s) => self.load_set(k, s.iter().map(kevy_bytes::SmallBytes::to_vec).collect(), ttl_ms),
330            Value::ZSet(z) => self.load_zset(
331                k,
332                z.ordered().map(|(m, sc)| (m.to_vec(), sc)).collect(),
333                ttl_ms,
334            ),
335            Value::Stream(st) => {
336                let entries: Vec<crate::stream::LoadedStreamEntry> = st
337                    .iter_entries()
338                    .map(|(id, fv)| {
339                        let fvv = fv
340                            .iter()
341                            .map(|(f, v)| (f.as_slice().to_vec(), v.as_slice().to_vec()))
342                            .collect();
343                        (id.ms, id.seq, fvv)
344                    })
345                    .collect();
346                let last = st.last_id();
347                let mxd = st.max_deleted_id();
348                self.load_stream(
349                    k,
350                    entries,
351                    (last.ms, last.seq),
352                    (mxd.ms, mxd.seq),
353                    st.entries_added(),
354                    st.export_groups(),
355                    ttl_ms,
356                );
357            }
358        }
359    }
360
361    /// Snapshot-load a stream: every entry plus the per-stream scalar
362    /// state (last_id, max_deleted_id, entries_added) and the consumer
363    /// groups are restored verbatim. Caller passes already-decoded
364    /// primitive tuples; this fn does the [`SmallBytes`] /
365    /// [`crate::StreamData`] conversion.
366    #[allow(clippy::too_many_arguments)]
367    pub fn load_stream(
368        &mut self,
369        key: Vec<u8>,
370        entries: Vec<crate::stream::LoadedStreamEntry>,
371        last_id: (u64, u64),
372        max_deleted_id: (u64, u64),
373        entries_added: u64,
374        groups: Vec<crate::stream::LoadedGroup>,
375        ttl_ms: Option<u64>,
376    ) {
377        let mut s = crate::stream::StreamData::default();
378        for (ms, seq, fv) in entries {
379            let id = crate::stream::StreamId { ms, seq };
380            let fv_small: Vec<(SmallBytes, SmallBytes)> = fv
381                .into_iter()
382                .map(|(f, v)| (SmallBytes::from_vec(f), SmallBytes::from_vec(v)))
383                .collect();
384            s.load_entry(id, fv_small);
385        }
386        s.set_loaded_state(
387            crate::stream::StreamId { ms: last_id.0, seq: last_id.1 },
388            crate::stream::StreamId { ms: max_deleted_id.0, seq: max_deleted_id.1 },
389            entries_added,
390        );
391        s.import_groups(groups);
392        self.insert_loaded(key, Value::Stream(Arc::new(s)), ttl_ms);
393    }
394}