Skip to main content

kevy_embedded/
ops.rs

1//! Data-type methods on [`Store`] — string, hash, list, set, sorted set,
2//! plus the pub/sub `publish` / `subscribe` / `psubscribe` entry points.
3//!
4//! All of these are thin facades over `kevy_store::Store` (the keyspace)
5//! and `pubsub::PubsubBus` (the in-process bus); they hold the embedded
6//! mutex for the duration of the underlying call, then drop it. AOF
7//! logging + post-write eviction sweep run via `commit_write` from
8//! `store.rs`. Behaviour and ABI are unchanged from the v1.1.0 single-file
9//! layout — this module only exists to keep `store.rs` under the 500-LOC
10//! cap.
11
12use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21    // ---- string ops -----------------------------------------------------
22
23    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under the
24    /// embedded API (Redis semantics: SET overwrites; NX/XX vetoes would
25    /// return `false` but we don't expose those here — use [`Store::with`]
26    /// for the full surface).
27    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28        let mut g = self.lock();
29        let ok = g.store.set(key, value.to_vec(), None, false, false);
30        commit_write(&mut g, &[b"SET", key, value])?;
31        Ok(ok)
32    }
33
34    /// `SET key value PX ms` — overwrites + sets TTL. The AOF records an
35    /// **absolute** `PEXPIREAT` deadline (not the relative `ttl`) so the key
36    /// expires at the same wall-clock instant after a restart — a relative
37    /// `PEXPIRE` would be re-anchored to replay-time, resetting the TTL to a
38    /// fresh full duration on every restart (INC-2026-06-09).
39    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40        let mut g = self.lock();
41        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43        let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44        commit_write(&mut g, &[b"SET", key, value])?;
45        commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46        Ok(ok)
47    }
48
49    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
50    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
51        let mut g = self.lock();
52        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
53    }
54
55    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
56    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
57        let mut g = self.lock();
58        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
59        let n = g.store.del(&owned);
60        if n > 0 {
61            let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
62            parts.push(b"DEL");
63            for k in keys {
64                parts.push(k);
65            }
66            commit_write(&mut g, &parts)?;
67        }
68        Ok(n)
69    }
70
71    /// `EXISTS key1 [key2 ...]`. Count of existing keys (duplicates counted
72    /// multiple times, matching Redis).
73    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
74        let mut g = self.lock();
75        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
76        Ok(g.store.exists(&owned))
77    }
78
79    /// `INCR key`. Returns the post-increment value.
80    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
81        self.incr_by(key, 1)
82    }
83
84    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
85    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
86        let mut g = self.lock();
87        let n = g.store.incr_by(key, delta).map_err(store_err)?;
88        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
89        Ok(n)
90    }
91
92    /// `EXPIRE key seconds`. Returns `true` if a key was touched. The AOF
93    /// records an absolute `PEXPIREAT` deadline (see [`Self::set_with_ttl`])
94    /// so the TTL survives a restart unchanged.
95    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
96        let mut g = self.lock();
97        let touched = g.store.expire(key, ttl);
98        if touched {
99            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
100            let deadline = kevy_store::now_unix_ms().saturating_add(ms);
101            commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
102        }
103        Ok(touched)
104    }
105
106    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
107    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
108        let mut g = self.lock();
109        let touched = g.store.persist(key);
110        if touched {
111            commit_write(&mut g, &[b"PERSIST", key])?;
112        }
113        Ok(touched)
114    }
115
116    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
117    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
118        self.lock().store.pttl(key)
119    }
120
121    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
122    pub fn type_of(&self, key: &[u8]) -> &'static str {
123        self.lock().store.type_of(key)
124    }
125
126    /// `DBSIZE` — total live keys.
127    pub fn dbsize(&self) -> usize {
128        self.lock().store.dbsize()
129    }
130
131    /// `FLUSHALL` — empty the keyspace (logged so a replay reaches the same
132    /// empty state).
133    pub fn flush(&self) -> io::Result<()> {
134        let mut g = self.lock();
135        g.store.flush();
136        commit_write(&mut g, &[b"FLUSHALL"])?;
137        Ok(())
138    }
139
140    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
141    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
142        self.lock().store.estimate_key_bytes(key)
143    }
144
145    /// Live `used_memory` estimate (matches `INFO memory`'s field).
146    pub fn used_memory(&self) -> u64 {
147        self.lock().store.used_memory()
148    }
149
150    /// `INFO`-style counter: total keys evicted by `maxmemory` so far.
151    pub fn evictions_total(&self) -> u64 {
152        self.lock().store.evictions_total()
153    }
154
155    /// `INFO`-style counter: total keys expired (lazy + active reaper).
156    pub fn expired_keys_total(&self) -> u64 {
157        self.lock().store.expired_keys_total()
158    }
159
160    // ---- hash ops -------------------------------------------------------
161
162    /// `HSET key field value [field value ...]`. Returns count newly added.
163    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
164        let mut g = self.lock();
165        let owned: Vec<(Vec<u8>, Vec<u8>)> =
166            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
167        let added = g.store.hset(key, &owned).map_err(store_err)?;
168        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
169        parts.push(b"HSET");
170        parts.push(key);
171        for (f, v) in pairs {
172            parts.push(f);
173            parts.push(v);
174        }
175        commit_write(&mut g, &parts)?;
176        Ok(added)
177    }
178
179    /// `HGET key field`. `None` if absent.
180    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
181        let mut g = self.lock();
182        Ok(g.store
183            .hget(key, field)
184            .map_err(store_err)?
185            .map(|v| v.to_vec()))
186    }
187
188    /// `HDEL key field [field ...]`. Returns count actually removed.
189    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
190        let mut g = self.lock();
191        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
192        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
193        if removed > 0 {
194            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
195            parts.push(b"HDEL");
196            parts.push(key);
197            for f in fields {
198                parts.push(f);
199            }
200            commit_write(&mut g, &parts)?;
201        }
202        Ok(removed)
203    }
204
205    // ---- list ops -------------------------------------------------------
206
207    /// `LPUSH key value [value ...]`. Returns the new list length.
208    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
209        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
210    }
211
212    /// `RPUSH key value [value ...]`. Returns the new list length.
213    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
214        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
215    }
216
217    /// `LPOP key count`. Returns popped values from the head.
218    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
219        pop_helper(self, key, count, false)
220    }
221
222    /// `RPOP key count`. Symmetric to `LPOP` from the tail.
223    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
224        pop_helper(self, key, count, true)
225    }
226
227    /// `LLEN key`. Length of the list at `key`; 0 if absent.
228    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
229        self.lock().store.llen(key).map_err(store_err)
230    }
231
232    // ---- set ops --------------------------------------------------------
233
234    /// `SADD key member [member ...]`. Returns count newly added.
235    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
236        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
237    }
238
239    /// `SREM key member [member ...]`. Returns count actually removed.
240    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
241        let mut g = self.lock();
242        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
243        let removed = g.store.srem(key, &owned).map_err(store_err)?;
244        if removed > 0 {
245            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
246            parts.push(b"SREM");
247            parts.push(key);
248            for m in members {
249                parts.push(m);
250            }
251            commit_write(&mut g, &parts)?;
252        }
253        Ok(removed)
254    }
255
256    /// `SMEMBERS key`. Order implementation-defined; empty if absent.
257    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
258        self.lock().store.smembers(key).map_err(store_err)
259    }
260
261    /// `SCARD key`. Member count; 0 if absent.
262    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
263        self.lock().store.scard(key).map_err(store_err)
264    }
265
266    // ---- zset ops -------------------------------------------------------
267
268    /// `ZADD key score member [score member ...]`. Returns count newly added.
269    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
270        let mut g = self.lock();
271        let owned: Vec<(f64, Vec<u8>)> =
272            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
273        let added = g.store.zadd(key, &owned).map_err(store_err)?;
274        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
275        for (s, _) in pairs {
276            score_strs.push(format!("{s}").into_bytes());
277        }
278        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
279        parts.push(b"ZADD");
280        parts.push(key);
281        for (i, (_, m)) in pairs.iter().enumerate() {
282            parts.push(&score_strs[i]);
283            parts.push(m);
284        }
285        commit_write(&mut g, &parts)?;
286        Ok(added)
287    }
288
289    /// `ZREM key member [member ...]`. Returns count actually removed.
290    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
291        let mut g = self.lock();
292        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
293        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
294        if removed > 0 {
295            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
296            parts.push(b"ZREM");
297            parts.push(key);
298            for m in members {
299                parts.push(m);
300            }
301            commit_write(&mut g, &parts)?;
302        }
303        Ok(removed)
304    }
305
306    /// `ZSCORE key member`. `Some(score)` if present.
307    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
308        self.lock().store.zscore(key, member).map_err(store_err)
309    }
310
311    /// `ZCARD key`. Member count; 0 if absent.
312    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
313        self.lock().store.zcard(key).map_err(store_err)
314    }
315
316    // ---- pub/sub --------------------------------------------------------
317
318    /// `PUBLISH channel payload`. Delivers `payload` to every subscriber on
319    /// `channel` (direct + pattern matches) inside this process. Returns
320    /// the count of receivers the message reached.
321    pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
322        // Clone matching senders under the lock, then release before
323        // send() so a slow receiver can't stall unrelated traffic.
324        let plans = {
325            let g = self.lock();
326            g.bus.collect_delivery(channel, payload)
327        };
328        let mut count = 0;
329        for (frame, sender) in plans {
330            if sender.send(frame).is_ok() {
331                count += 1;
332            }
333        }
334        count
335    }
336
337    /// Open a [`Subscription`] subscribed to `channels`. Drop the handle
338    /// to unsubscribe from everything atomically. Pass `&[]` to start
339    /// with no subscriptions and add some later via
340    /// [`Subscription::subscribe`] / [`Subscription::psubscribe`].
341    pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
342        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
343        if !channels.is_empty() {
344            sub.subscribe(channels);
345        }
346        sub
347    }
348
349    /// Convenience: open a [`Subscription`] starting on pattern subscriptions.
350    pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
351        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
352        if !patterns.is_empty() {
353            sub.psubscribe(patterns);
354        }
355        sub
356    }
357}
358
359// ─────────────────────────────────────────────────────────────────────────
360// Shared list/set push + list pop helpers. `&Store` so we can lock + AOF-log.
361// ─────────────────────────────────────────────────────────────────────────
362
363fn push_helper<F>(
364    s: &Store,
365    key: &[u8],
366    values: &[&[u8]],
367    verb: &'static [u8],
368    op: F,
369) -> io::Result<usize>
370where
371    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
372{
373    let mut g = s.lock();
374    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
375    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
376    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
377    parts.push(verb);
378    parts.push(key);
379    for v in values {
380        parts.push(v);
381    }
382    commit_write(&mut g, &parts)?;
383    Ok(n)
384}
385
386fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
387    let mut g = s.lock();
388    let popped = if from_tail {
389        g.store.rpop(key, count).map_err(store_err)?
390    } else {
391        g.store.lpop(key, count).map_err(store_err)?
392    };
393    if !popped.is_empty() {
394        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
395        let count_str = popped.len().to_string();
396        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
397        commit_write(&mut g, &parts)?;
398    }
399    Ok(popped)
400}