Skip to main content

kevy_embedded/
ops.rs

1//! Data-type methods on [`Store`] — string, hash, list, set, sorted set,
2//! plus the pub/sub `publish` / `subscribe` / `psubscribe` entry points.
3//!
4//! All of these are thin facades over `kevy_store::Store` (the keyspace)
5//! and `pubsub::PubsubBus` (the in-process bus); they hold the embedded
6//! mutex for the duration of the underlying call, then drop it. AOF
7//! logging + post-write eviction sweep run via `commit_write` from
8//! `store.rs`. Behaviour and ABI are unchanged from the v1.1.0 single-file
9//! layout — this module only exists to keep `store.rs` under the 500-LOC
10//! cap.
11
12use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21    // ---- string ops -----------------------------------------------------
22
23    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under the
24    /// embedded API (Redis semantics: SET overwrites; NX/XX vetoes would
25    /// return `false` but we don't expose those here — use [`Store::with`]
26    /// for the full surface).
27    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28        let mut g = self.wshard(key);
29        let ok = g.store.set(key, value.to_vec(), None, false, false);
30        commit_write(&mut g, &[b"SET", key, value])?;
31        Ok(ok)
32    }
33
34    /// `SET key value PX ms` — overwrites + sets TTL. The AOF records an
35    /// **absolute** `PEXPIREAT` deadline (not the relative `ttl`) so the key
36    /// expires at the same wall-clock instant after a restart — a relative
37    /// `PEXPIRE` would be re-anchored to replay-time, resetting the TTL to a
38    /// fresh full duration on every restart (INC-2026-06-09).
39    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40        let mut g = self.wshard(key);
41        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43        let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44        commit_write(&mut g, &[b"SET", key, value])?;
45        commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46        Ok(ok)
47    }
48
49    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
50    ///
51    /// With eviction off (`maxmemory == 0`, the default) this takes the **read**
52    /// lock and a non-mutating store lookup, so concurrent readers scale across
53    /// cores — the path a read-heavy embed cache lives on. With eviction on it
54    /// falls back to the exclusive lock + mutating get so each access still
55    /// stamps the LRU clock.
56    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
57        if self.config().maxmemory == 0 {
58            let g = self.rshard(key);
59            return Ok(g.store.get_shared(key).map_err(store_err)?.map(|v| v.to_vec()));
60        }
61        let mut g = self.wshard(key);
62        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
63    }
64
65    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
66    /// Keys fan out to their owning shards.
67    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
68        let mut total = 0;
69        for k in keys {
70            let owned = vec![k.to_vec()];
71            let mut g = self.wshard(k);
72            let n = g.store.del(&owned);
73            if n > 0 {
74                total += n;
75                commit_write(&mut g, &[b"DEL", k])?;
76            }
77        }
78        Ok(total)
79    }
80
81    /// `EXISTS key1 [key2 ...]`. Count of existing keys (duplicates counted
82    /// multiple times, matching Redis).
83    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
84        let mut total = 0;
85        for k in keys {
86            total += self.wshard(k).store.exists(&[k.to_vec()]);
87        }
88        Ok(total)
89    }
90
91    /// `INCR key`. Returns the post-increment value.
92    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
93        self.incr_by(key, 1)
94    }
95
96    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
97    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
98        let mut g = self.wshard(key);
99        let n = g.store.incr_by(key, delta).map_err(store_err)?;
100        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
101        Ok(n)
102    }
103
104    /// `EXPIRE key seconds`. Returns `true` if a key was touched. The AOF
105    /// records an absolute `PEXPIREAT` deadline (see [`Self::set_with_ttl`])
106    /// so the TTL survives a restart unchanged.
107    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
108        let mut g = self.wshard(key);
109        let touched = g.store.expire(key, ttl);
110        if touched {
111            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
112            let deadline = kevy_store::now_unix_ms().saturating_add(ms);
113            commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
114        }
115        Ok(touched)
116    }
117
118    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
119    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
120        let mut g = self.wshard(key);
121        let touched = g.store.persist(key);
122        if touched {
123            commit_write(&mut g, &[b"PERSIST", key])?;
124        }
125        Ok(touched)
126    }
127
128    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
129    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
130        self.wshard(key).store.pttl(key)
131    }
132
133    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
134    pub fn type_of(&self, key: &[u8]) -> &'static str {
135        self.wshard(key).store.type_of(key)
136    }
137
138    /// `DBSIZE` — total live keys across all shards.
139    pub fn dbsize(&self) -> usize {
140        self.sum_shards(|i| i.store.dbsize())
141    }
142
143    /// `FLUSHALL` — empty every shard (each logs `FLUSHALL` so a replay reaches
144    /// the same empty state).
145    pub fn flush(&self) -> io::Result<()> {
146        self.try_for_each_shard(|inner| {
147            inner.store.flush();
148            commit_write(inner, &[b"FLUSHALL"])
149        })
150    }
151
152    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
153    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
154        self.wshard(key).store.estimate_key_bytes(key)
155    }
156
157    /// Live `used_memory` estimate (summed across shards).
158    pub fn used_memory(&self) -> u64 {
159        self.sum_shards_u64(|i| i.store.used_memory())
160    }
161
162    /// `INFO`-style counter: total keys evicted by `maxmemory` (all shards).
163    pub fn evictions_total(&self) -> u64 {
164        self.sum_shards_u64(|i| i.store.evictions_total())
165    }
166
167    /// `INFO`-style counter: total keys expired (lazy + active reaper, all shards).
168    pub fn expired_keys_total(&self) -> u64 {
169        self.sum_shards_u64(|i| i.store.expired_keys_total())
170    }
171
172    // ---- hash ops -------------------------------------------------------
173
174    /// `HSET key field value [field value ...]`. Returns count newly added.
175    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
176        let mut g = self.wshard(key);
177        let owned: Vec<(Vec<u8>, Vec<u8>)> =
178            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
179        let added = g.store.hset(key, &owned).map_err(store_err)?;
180        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
181        parts.push(b"HSET");
182        parts.push(key);
183        for (f, v) in pairs {
184            parts.push(f);
185            parts.push(v);
186        }
187        commit_write(&mut g, &parts)?;
188        Ok(added)
189    }
190
191    /// `HGET key field`. `None` if absent.
192    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
193        let mut g = self.wshard(key);
194        Ok(g.store
195            .hget(key, field)
196            .map_err(store_err)?
197            .map(|v| v.to_vec()))
198    }
199
200    /// `HDEL key field [field ...]`. Returns count actually removed.
201    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
202        let mut g = self.wshard(key);
203        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
204        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
205        if removed > 0 {
206            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
207            parts.push(b"HDEL");
208            parts.push(key);
209            for f in fields {
210                parts.push(f);
211            }
212            commit_write(&mut g, &parts)?;
213        }
214        Ok(removed)
215    }
216
217    // ---- list ops -------------------------------------------------------
218
219    /// `LPUSH key value [value ...]`. Returns the new list length.
220    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
221        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
222    }
223
224    /// `RPUSH key value [value ...]`. Returns the new list length.
225    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
226        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
227    }
228
229    /// `LPOP key count`. Returns popped values from the head.
230    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
231        pop_helper(self, key, count, false)
232    }
233
234    /// `RPOP key count`. Symmetric to `LPOP` from the tail.
235    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
236        pop_helper(self, key, count, true)
237    }
238
239    /// `LLEN key`. Length of the list at `key`; 0 if absent.
240    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
241        self.wshard(key).store.llen(key).map_err(store_err)
242    }
243
244    // ---- set ops --------------------------------------------------------
245
246    /// `SADD key member [member ...]`. Returns count newly added.
247    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
248        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
249    }
250
251    /// `SREM key member [member ...]`. Returns count actually removed.
252    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
253        let mut g = self.wshard(key);
254        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
255        let removed = g.store.srem(key, &owned).map_err(store_err)?;
256        if removed > 0 {
257            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
258            parts.push(b"SREM");
259            parts.push(key);
260            for m in members {
261                parts.push(m);
262            }
263            commit_write(&mut g, &parts)?;
264        }
265        Ok(removed)
266    }
267
268    /// `SMEMBERS key`. Order implementation-defined; empty if absent.
269    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
270        self.wshard(key).store.smembers(key).map_err(store_err)
271    }
272
273    /// `SCARD key`. Member count; 0 if absent.
274    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
275        self.wshard(key).store.scard(key).map_err(store_err)
276    }
277
278    // ---- zset ops -------------------------------------------------------
279
280    /// `ZADD key score member [score member ...]`. Returns count newly added.
281    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
282        let mut g = self.wshard(key);
283        let owned: Vec<(f64, Vec<u8>)> =
284            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
285        let added = g.store.zadd(key, &owned).map_err(store_err)?;
286        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
287        for (s, _) in pairs {
288            score_strs.push(format!("{s}").into_bytes());
289        }
290        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
291        parts.push(b"ZADD");
292        parts.push(key);
293        for (i, (_, m)) in pairs.iter().enumerate() {
294            parts.push(&score_strs[i]);
295            parts.push(m);
296        }
297        commit_write(&mut g, &parts)?;
298        Ok(added)
299    }
300
301    /// `ZREM key member [member ...]`. Returns count actually removed.
302    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
303        let mut g = self.wshard(key);
304        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
305        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
306        if removed > 0 {
307            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
308            parts.push(b"ZREM");
309            parts.push(key);
310            for m in members {
311                parts.push(m);
312            }
313            commit_write(&mut g, &parts)?;
314        }
315        Ok(removed)
316    }
317
318    /// `ZSCORE key member`. `Some(score)` if present.
319    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
320        self.wshard(key).store.zscore(key, member).map_err(store_err)
321    }
322
323    /// `ZCARD key`. Member count; 0 if absent.
324    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
325        self.wshard(key).store.zcard(key).map_err(store_err)
326    }
327
328    // ---- pub/sub --------------------------------------------------------
329
330    /// `PUBLISH channel payload`. Delivers `payload` to every subscriber on
331    /// `channel` (direct + pattern matches) inside this process. Returns
332    /// the count of receivers the message reached.
333    pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
334        // Clone matching senders under the lock, then release before
335        // send() so a slow receiver can't stall unrelated traffic.
336        let plans = {
337            // Pub/sub is process-wide; the bus lives on shard 0.
338            let g = self.lock();
339            g.bus.collect_delivery(channel, payload)
340        };
341        let mut count = 0;
342        for (frame, sender) in plans {
343            if sender.send(frame).is_ok() {
344                count += 1;
345            }
346        }
347        count
348    }
349
350    /// Open a [`Subscription`] subscribed to `channels`. Drop the handle
351    /// to unsubscribe from everything atomically. Pass `&[]` to start
352    /// with no subscriptions and add some later via
353    /// [`Subscription::subscribe`] / [`Subscription::psubscribe`].
354    pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
355        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
356        if !channels.is_empty() {
357            sub.subscribe(channels);
358        }
359        sub
360    }
361
362    /// Convenience: open a [`Subscription`] starting on pattern subscriptions.
363    pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
364        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
365        if !patterns.is_empty() {
366            sub.psubscribe(patterns);
367        }
368        sub
369    }
370}
371
372// ─────────────────────────────────────────────────────────────────────────
373// Shared list/set push + list pop helpers. `&Store` so we can lock + AOF-log.
374// ─────────────────────────────────────────────────────────────────────────
375
376fn push_helper<F>(
377    s: &Store,
378    key: &[u8],
379    values: &[&[u8]],
380    verb: &'static [u8],
381    op: F,
382) -> io::Result<usize>
383where
384    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
385{
386    let mut g = s.wshard(key);
387    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
388    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
389    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
390    parts.push(verb);
391    parts.push(key);
392    for v in values {
393        parts.push(v);
394    }
395    commit_write(&mut g, &parts)?;
396    Ok(n)
397}
398
399fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
400    let mut g = s.wshard(key);
401    let popped = if from_tail {
402        g.store.rpop(key, count).map_err(store_err)?
403    } else {
404        g.store.lpop(key, count).map_err(store_err)?
405    };
406    if !popped.is_empty() {
407        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
408        let count_str = popped.len().to_string();
409        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
410        commit_write(&mut g, &parts)?;
411    }
412    Ok(popped)
413}