Skip to main content

kevy_embedded/
ops.rs

1//! Data-type methods on [`Store`] — string, hash, list, set, sorted set,
2//! plus the pub/sub `publish` / `subscribe` / `psubscribe` entry points.
3//!
4//! All of these are thin facades over `kevy_store::Store` (the keyspace)
5//! and `pubsub::PubsubBus` (the in-process bus); they hold the embedded
6//! mutex for the duration of the underlying call, then drop it. AOF
7//! logging + post-write eviction sweep run via `commit_write` from
8//! `store.rs`. Behaviour and ABI are unchanged from the v1.1.0 single-file
9//! layout — this module only exists to keep `store.rs` under the 500-LOC
10//! cap.
11
12use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21    // ---- string ops -----------------------------------------------------
22
23    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under the
24    /// embedded API (Redis semantics: SET overwrites; NX/XX vetoes would
25    /// return `false` but we don't expose those here — use [`Store::with`]
26    /// for the full surface).
27    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28        let mut g = self.lock();
29        let ok = g.store.set(key, value.to_vec(), None, false, false);
30        commit_write(&mut g, &[b"SET", key, value])?;
31        Ok(ok)
32    }
33
34    /// `SET key value PX ms` — overwrites + sets TTL.
35    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
36        let mut g = self.lock();
37        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
38        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
39        commit_write(&mut g, &[b"SET", key, value])?;
40        commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
41        Ok(ok)
42    }
43
44    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
45    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
46        let mut g = self.lock();
47        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
48    }
49
50    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
51    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
52        let mut g = self.lock();
53        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
54        let n = g.store.del(&owned);
55        if n > 0 {
56            let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
57            parts.push(b"DEL");
58            for k in keys {
59                parts.push(k);
60            }
61            commit_write(&mut g, &parts)?;
62        }
63        Ok(n)
64    }
65
66    /// `EXISTS key1 [key2 ...]`. Count of existing keys (duplicates counted
67    /// multiple times, matching Redis).
68    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
69        let mut g = self.lock();
70        let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
71        Ok(g.store.exists(&owned))
72    }
73
74    /// `INCR key`. Returns the post-increment value.
75    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
76        self.incr_by(key, 1)
77    }
78
79    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
80    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
81        let mut g = self.lock();
82        let n = g.store.incr_by(key, delta).map_err(store_err)?;
83        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
84        Ok(n)
85    }
86
87    /// `EXPIRE key seconds`. Returns `true` if a key was touched.
88    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
89        let mut g = self.lock();
90        let touched = g.store.expire(key, ttl);
91        if touched {
92            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
93            commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
94        }
95        Ok(touched)
96    }
97
98    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
99    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
100        let mut g = self.lock();
101        let touched = g.store.persist(key);
102        if touched {
103            commit_write(&mut g, &[b"PERSIST", key])?;
104        }
105        Ok(touched)
106    }
107
108    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
109    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
110        self.lock().store.pttl(key)
111    }
112
113    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
114    pub fn type_of(&self, key: &[u8]) -> &'static str {
115        self.lock().store.type_of(key)
116    }
117
118    /// `DBSIZE` — total live keys.
119    pub fn dbsize(&self) -> usize {
120        self.lock().store.dbsize()
121    }
122
123    /// `FLUSHALL` — empty the keyspace (logged so a replay reaches the same
124    /// empty state).
125    pub fn flush(&self) -> io::Result<()> {
126        let mut g = self.lock();
127        g.store.flush();
128        commit_write(&mut g, &[b"FLUSHALL"])?;
129        Ok(())
130    }
131
132    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
133    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
134        self.lock().store.estimate_key_bytes(key)
135    }
136
137    /// Live `used_memory` estimate (matches `INFO memory`'s field).
138    pub fn used_memory(&self) -> u64 {
139        self.lock().store.used_memory()
140    }
141
142    /// `INFO`-style counter: total keys evicted by `maxmemory` so far.
143    pub fn evictions_total(&self) -> u64 {
144        self.lock().store.evictions_total()
145    }
146
147    /// `INFO`-style counter: total keys expired (lazy + active reaper).
148    pub fn expired_keys_total(&self) -> u64 {
149        self.lock().store.expired_keys_total()
150    }
151
152    // ---- hash ops -------------------------------------------------------
153
154    /// `HSET key field value [field value ...]`. Returns count newly added.
155    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
156        let mut g = self.lock();
157        let owned: Vec<(Vec<u8>, Vec<u8>)> =
158            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
159        let added = g.store.hset(key, &owned).map_err(store_err)?;
160        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
161        parts.push(b"HSET");
162        parts.push(key);
163        for (f, v) in pairs {
164            parts.push(f);
165            parts.push(v);
166        }
167        commit_write(&mut g, &parts)?;
168        Ok(added)
169    }
170
171    /// `HGET key field`. `None` if absent.
172    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
173        let mut g = self.lock();
174        Ok(g.store
175            .hget(key, field)
176            .map_err(store_err)?
177            .map(|v| v.to_vec()))
178    }
179
180    /// `HDEL key field [field ...]`. Returns count actually removed.
181    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
182        let mut g = self.lock();
183        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
184        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
185        if removed > 0 {
186            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
187            parts.push(b"HDEL");
188            parts.push(key);
189            for f in fields {
190                parts.push(f);
191            }
192            commit_write(&mut g, &parts)?;
193        }
194        Ok(removed)
195    }
196
197    // ---- list ops -------------------------------------------------------
198
199    /// `LPUSH key value [value ...]`. Returns the new list length.
200    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
201        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
202    }
203
204    /// `RPUSH key value [value ...]`. Returns the new list length.
205    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
206        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
207    }
208
209    /// `LPOP key count`. Returns popped values from the head.
210    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
211        pop_helper(self, key, count, false)
212    }
213
214    /// `RPOP key count`. Symmetric to `LPOP` from the tail.
215    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
216        pop_helper(self, key, count, true)
217    }
218
219    /// `LLEN key`. Length of the list at `key`; 0 if absent.
220    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
221        self.lock().store.llen(key).map_err(store_err)
222    }
223
224    // ---- set ops --------------------------------------------------------
225
226    /// `SADD key member [member ...]`. Returns count newly added.
227    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
228        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
229    }
230
231    /// `SREM key member [member ...]`. Returns count actually removed.
232    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
233        let mut g = self.lock();
234        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
235        let removed = g.store.srem(key, &owned).map_err(store_err)?;
236        if removed > 0 {
237            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
238            parts.push(b"SREM");
239            parts.push(key);
240            for m in members {
241                parts.push(m);
242            }
243            commit_write(&mut g, &parts)?;
244        }
245        Ok(removed)
246    }
247
248    /// `SMEMBERS key`. Order implementation-defined; empty if absent.
249    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
250        self.lock().store.smembers(key).map_err(store_err)
251    }
252
253    /// `SCARD key`. Member count; 0 if absent.
254    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
255        self.lock().store.scard(key).map_err(store_err)
256    }
257
258    // ---- zset ops -------------------------------------------------------
259
260    /// `ZADD key score member [score member ...]`. Returns count newly added.
261    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
262        let mut g = self.lock();
263        let owned: Vec<(f64, Vec<u8>)> =
264            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
265        let added = g.store.zadd(key, &owned).map_err(store_err)?;
266        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
267        for (s, _) in pairs {
268            score_strs.push(format!("{s}").into_bytes());
269        }
270        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
271        parts.push(b"ZADD");
272        parts.push(key);
273        for (i, (_, m)) in pairs.iter().enumerate() {
274            parts.push(&score_strs[i]);
275            parts.push(m);
276        }
277        commit_write(&mut g, &parts)?;
278        Ok(added)
279    }
280
281    /// `ZREM key member [member ...]`. Returns count actually removed.
282    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
283        let mut g = self.lock();
284        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
285        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
286        if removed > 0 {
287            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
288            parts.push(b"ZREM");
289            parts.push(key);
290            for m in members {
291                parts.push(m);
292            }
293            commit_write(&mut g, &parts)?;
294        }
295        Ok(removed)
296    }
297
298    /// `ZSCORE key member`. `Some(score)` if present.
299    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
300        self.lock().store.zscore(key, member).map_err(store_err)
301    }
302
303    /// `ZCARD key`. Member count; 0 if absent.
304    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
305        self.lock().store.zcard(key).map_err(store_err)
306    }
307
308    // ---- pub/sub --------------------------------------------------------
309
310    /// `PUBLISH channel payload`. Delivers `payload` to every subscriber on
311    /// `channel` (direct + pattern matches) inside this process. Returns
312    /// the count of receivers the message reached.
313    pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
314        // Clone matching senders under the lock, then release before
315        // send() so a slow receiver can't stall unrelated traffic.
316        let plans = {
317            let g = self.lock();
318            g.bus.collect_delivery(channel, payload)
319        };
320        let mut count = 0;
321        for (frame, sender) in plans {
322            if sender.send(frame).is_ok() {
323                count += 1;
324            }
325        }
326        count
327    }
328
329    /// Open a [`Subscription`] subscribed to `channels`. Drop the handle
330    /// to unsubscribe from everything atomically. Pass `&[]` to start
331    /// with no subscriptions and add some later via
332    /// [`Subscription::subscribe`] / [`Subscription::psubscribe`].
333    pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
334        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
335        if !channels.is_empty() {
336            sub.subscribe(channels);
337        }
338        sub
339    }
340
341    /// Convenience: open a [`Subscription`] starting on pattern subscriptions.
342    pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
343        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
344        if !patterns.is_empty() {
345            sub.psubscribe(patterns);
346        }
347        sub
348    }
349}
350
351// ─────────────────────────────────────────────────────────────────────────
352// Shared list/set push + list pop helpers. `&Store` so we can lock + AOF-log.
353// ─────────────────────────────────────────────────────────────────────────
354
355fn push_helper<F>(
356    s: &Store,
357    key: &[u8],
358    values: &[&[u8]],
359    verb: &'static [u8],
360    op: F,
361) -> io::Result<usize>
362where
363    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
364{
365    let mut g = s.lock();
366    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
367    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
368    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
369    parts.push(verb);
370    parts.push(key);
371    for v in values {
372        parts.push(v);
373    }
374    commit_write(&mut g, &parts)?;
375    Ok(n)
376}
377
378fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
379    let mut g = s.lock();
380    let popped = if from_tail {
381        g.store.rpop(key, count).map_err(store_err)?
382    } else {
383        g.store.lpop(key, count).map_err(store_err)?
384    };
385    if !popped.is_empty() {
386        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
387        let count_str = popped.len().to_string();
388        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
389        commit_write(&mut g, &parts)?;
390    }
391    Ok(popped)
392}