Skip to main content

kevy_embedded/
ops.rs

1//! Data-type methods on [`Store`] — string, hash, list, set, sorted set,
2//! plus the pub/sub `publish` / `subscribe` / `psubscribe` entry points.
3//!
4//! All of these are thin facades over `kevy_store::Store` (the keyspace)
5//! and `pubsub::PubsubBus` (the in-process bus); they hold the embedded
6//! mutex for the duration of the underlying call, then drop it. AOF
7//! logging + post-write eviction sweep run via `commit_write` from
8//! `store.rs`. Behaviour and ABI are unchanged from the v1.1.0 single-file
9//! layout — this module only exists to keep `store.rs` under the 500-LOC
10//! cap.
11
12use std::io;
13use std::time::Duration;
14
15use kevy_store::StoreError;
16
17use crate::pubsub::Subscription;
18use crate::store::{Store, commit_write, store_err};
19
20impl Store {
21    // ---- string ops -----------------------------------------------------
22
23    /// `SET key value` (no TTL, no NX/XX). Returns `true` always under the
24    /// embedded API (Redis semantics: SET overwrites; NX/XX vetoes would
25    /// return `false` but we don't expose those here — use [`Store::with`]
26    /// for the full surface).
27    pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
28        let mut g = self.wshard(key);
29        let ok = g.store.set(key, value.to_vec(), None, false, false);
30        commit_write(&mut g, &[b"SET", key, value])?;
31        Ok(ok)
32    }
33
34    /// `SET key value PX ms` — overwrites + sets TTL. The AOF records an
35    /// **absolute** `PEXPIREAT` deadline (not the relative `ttl`) so the key
36    /// expires at the same wall-clock instant after a restart — a relative
37    /// `PEXPIRE` would be re-anchored to replay-time, resetting the TTL to a
38    /// fresh full duration on every restart (INC-2026-06-09).
39    pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
40        let mut g = self.wshard(key);
41        let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
42        let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
43        let deadline = kevy_store::now_unix_ms().saturating_add(ms);
44        commit_write(&mut g, &[b"SET", key, value])?;
45        commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
46        Ok(ok)
47    }
48
49    /// `GET key` — `Some(bytes)` on hit, `None` on miss or expired.
50    ///
51    /// With eviction off (`maxmemory == 0`, the default) this takes the **read**
52    /// lock and a non-mutating store lookup, so concurrent readers scale across
53    /// cores — the path a read-heavy embed cache lives on. With eviction on it
54    /// falls back to the exclusive lock + mutating get so each access still
55    /// stamps the LRU clock.
56    pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
57        if self.config().maxmemory == 0 {
58            let g = self.rshard(key);
59            return Ok(g.store.get_shared(key).map_err(store_err)?.map(|v| v.to_vec()));
60        }
61        let mut g = self.wshard(key);
62        Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
63    }
64
65    /// `DEL key1 [key2 ...]`. Returns the count of keys actually removed.
66    /// Keys fan out to their owning shards.
67    pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
68        let mut total = 0;
69        for k in keys {
70            let owned = vec![k.to_vec()];
71            let mut g = self.wshard(k);
72            let n = g.store.del(&owned);
73            if n > 0 {
74                total += n;
75                commit_write(&mut g, &[b"DEL", k])?;
76            }
77        }
78        Ok(total)
79    }
80
81    /// `EXISTS key1 [key2 ...]`. Count of existing keys (duplicates counted
82    /// multiple times, matching Redis).
83    pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
84        let mut total = 0;
85        for k in keys {
86            total += self.wshard(k).store.exists(&[k.to_vec()]);
87        }
88        Ok(total)
89    }
90
91    /// `INCR key`. Returns the post-increment value.
92    pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
93        self.incr_by(key, 1)
94    }
95
96    /// `INCRBY key delta`. Negative `delta` does DECR-style work.
97    pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
98        let mut g = self.wshard(key);
99        let n = g.store.incr_by(key, delta).map_err(store_err)?;
100        commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
101        Ok(n)
102    }
103
104    /// `EXPIRE key seconds`. Returns `true` if a key was touched. The AOF
105    /// records an absolute `PEXPIREAT` deadline (see [`Self::set_with_ttl`])
106    /// so the TTL survives a restart unchanged.
107    pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
108        let mut g = self.wshard(key);
109        let touched = g.store.expire(key, ttl);
110        if touched {
111            let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
112            let deadline = kevy_store::now_unix_ms().saturating_add(ms);
113            commit_write(&mut g, &[b"PEXPIREAT", key, deadline.to_string().as_bytes()])?;
114        }
115        Ok(touched)
116    }
117
118    /// `PERSIST key`. Returns `true` if a TTL was actually cleared.
119    pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
120        let mut g = self.wshard(key);
121        let touched = g.store.persist(key);
122        if touched {
123            commit_write(&mut g, &[b"PERSIST", key])?;
124        }
125        Ok(touched)
126    }
127
128    /// Remaining TTL in ms (or Redis-style `-1`/`-2` for no-TTL/no-key).
129    pub fn ttl_ms(&self, key: &[u8]) -> i64 {
130        self.wshard(key).store.pttl(key)
131    }
132
133    /// `TYPE key` — `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"`.
134    pub fn type_of(&self, key: &[u8]) -> &'static str {
135        self.wshard(key).store.type_of(key)
136    }
137
138    /// `DBSIZE` — total live keys across all shards.
139    pub fn dbsize(&self) -> usize {
140        self.sum_shards(|i| i.store.dbsize())
141    }
142
143    /// `FLUSHALL` — empty every shard (each logs `FLUSHALL` so a replay reaches
144    /// the same empty state).
145    ///
146    /// Named `flushall` — **not** `flush` — to avoid colliding with
147    /// `Write::flush`'s "sync buffered writes to disk" meaning. This call
148    /// WIPES the store; durability needs no explicit call (each write appends
149    /// to the AOF, the shard's `BufWriter` lands per [`AppendFsync`] cadence
150    /// and on drop).
151    ///
152    /// [`AppendFsync`]: crate::AppendFsync
153    pub fn flushall(&self) -> io::Result<()> {
154        self.try_for_each_shard(|inner| {
155            inner.store.flushall();
156            commit_write(inner, &[b"FLUSHALL"])
157        })
158    }
159
160    /// Deprecated alias for [`Self::flushall`]. The old name read like
161    /// `Write::flush` (sync-to-disk) but actually WIPES the store — a
162    /// data-loss footgun.
163    #[deprecated(
164        since = "1.2.0",
165        note = "renamed to `flushall`: `flush` collides with Write::flush (sync-to-disk); this WIPES the store"
166    )]
167    pub fn flush(&self) -> io::Result<()> {
168        self.flushall()
169    }
170
171    /// `MEMORY USAGE` for one key — `Some(bytes)` or `None` if absent.
172    pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
173        self.wshard(key).store.estimate_key_bytes(key)
174    }
175
176    /// Live `used_memory` estimate (summed across shards).
177    pub fn used_memory(&self) -> u64 {
178        self.sum_shards_u64(|i| i.store.used_memory())
179    }
180
181    /// `INFO`-style counter: total keys evicted by `maxmemory` (all shards).
182    pub fn evictions_total(&self) -> u64 {
183        self.sum_shards_u64(|i| i.store.evictions_total())
184    }
185
186    /// `INFO`-style counter: total keys expired (lazy + active reaper, all shards).
187    pub fn expired_keys_total(&self) -> u64 {
188        self.sum_shards_u64(|i| i.store.expired_keys_total())
189    }
190
191    // ---- hash ops -------------------------------------------------------
192
193    /// `HSET key field value [field value ...]`. Returns count newly added.
194    pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
195        let mut g = self.wshard(key);
196        let owned: Vec<(Vec<u8>, Vec<u8>)> =
197            pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
198        let added = g.store.hset(key, &owned).map_err(store_err)?;
199        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
200        parts.push(b"HSET");
201        parts.push(key);
202        for (f, v) in pairs {
203            parts.push(f);
204            parts.push(v);
205        }
206        commit_write(&mut g, &parts)?;
207        Ok(added)
208    }
209
210    /// `HGET key field`. `None` if absent.
211    pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
212        let mut g = self.wshard(key);
213        Ok(g.store
214            .hget(key, field)
215            .map_err(store_err)?
216            .map(|v| v.to_vec()))
217    }
218
219    /// `HDEL key field [field ...]`. Returns count actually removed.
220    pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
221        let mut g = self.wshard(key);
222        let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
223        let removed = g.store.hdel(key, &owned).map_err(store_err)?;
224        if removed > 0 {
225            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
226            parts.push(b"HDEL");
227            parts.push(key);
228            for f in fields {
229                parts.push(f);
230            }
231            commit_write(&mut g, &parts)?;
232        }
233        Ok(removed)
234    }
235
236    // ---- list ops -------------------------------------------------------
237
238    /// `LPUSH key value [value ...]`. Returns the new list length.
239    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
240        push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
241    }
242
243    /// `RPUSH key value [value ...]`. Returns the new list length.
244    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
245        push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
246    }
247
248    /// `LPOP key count`. Returns popped values from the head.
249    pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
250        pop_helper(self, key, count, false)
251    }
252
253    /// `RPOP key count`. Symmetric to `LPOP` from the tail.
254    pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
255        pop_helper(self, key, count, true)
256    }
257
258    /// `LLEN key`. Length of the list at `key`; 0 if absent.
259    pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
260        self.wshard(key).store.llen(key).map_err(store_err)
261    }
262
263    // ---- set ops --------------------------------------------------------
264
265    /// `SADD key member [member ...]`. Returns count newly added.
266    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
267        push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
268    }
269
270    /// `SREM key member [member ...]`. Returns count actually removed.
271    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
272        let mut g = self.wshard(key);
273        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
274        let removed = g.store.srem(key, &owned).map_err(store_err)?;
275        if removed > 0 {
276            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
277            parts.push(b"SREM");
278            parts.push(key);
279            for m in members {
280                parts.push(m);
281            }
282            commit_write(&mut g, &parts)?;
283        }
284        Ok(removed)
285    }
286
287    /// `SMEMBERS key`. Order implementation-defined; empty if absent.
288    pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
289        self.wshard(key).store.smembers(key).map_err(store_err)
290    }
291
292    /// `SCARD key`. Member count; 0 if absent.
293    pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
294        self.wshard(key).store.scard(key).map_err(store_err)
295    }
296
297    // ---- zset ops -------------------------------------------------------
298
299    /// `ZADD key score member [score member ...]`. Returns count newly added.
300    pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
301        let mut g = self.wshard(key);
302        let owned: Vec<(f64, Vec<u8>)> =
303            pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
304        let added = g.store.zadd(key, &owned).map_err(store_err)?;
305        let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
306        for (s, _) in pairs {
307            score_strs.push(format!("{s}").into_bytes());
308        }
309        let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
310        parts.push(b"ZADD");
311        parts.push(key);
312        for (i, (_, m)) in pairs.iter().enumerate() {
313            parts.push(&score_strs[i]);
314            parts.push(m);
315        }
316        commit_write(&mut g, &parts)?;
317        Ok(added)
318    }
319
320    /// `ZREM key member [member ...]`. Returns count actually removed.
321    pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
322        let mut g = self.wshard(key);
323        let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
324        let removed = g.store.zrem(key, &owned).map_err(store_err)?;
325        if removed > 0 {
326            let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
327            parts.push(b"ZREM");
328            parts.push(key);
329            for m in members {
330                parts.push(m);
331            }
332            commit_write(&mut g, &parts)?;
333        }
334        Ok(removed)
335    }
336
337    /// `ZSCORE key member`. `Some(score)` if present.
338    pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
339        self.wshard(key).store.zscore(key, member).map_err(store_err)
340    }
341
342    /// `ZCARD key`. Member count; 0 if absent.
343    pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
344        self.wshard(key).store.zcard(key).map_err(store_err)
345    }
346
347    // ---- pub/sub --------------------------------------------------------
348
349    /// `PUBLISH channel payload`. Delivers `payload` to every subscriber on
350    /// `channel` (direct + pattern matches) inside this process. Returns
351    /// the count of receivers the message reached.
352    pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
353        // Clone matching senders under the lock, then release before
354        // send() so a slow receiver can't stall unrelated traffic.
355        let plans = {
356            // Pub/sub is process-wide; the bus lives on shard 0.
357            let g = self.lock();
358            g.bus.collect_delivery(channel, payload)
359        };
360        let mut count = 0;
361        for (frame, sender) in plans {
362            if sender.send(frame).is_ok() {
363                count += 1;
364            }
365        }
366        count
367    }
368
369    /// Open a [`Subscription`] subscribed to `channels`. Drop the handle
370    /// to unsubscribe from everything atomically. Pass `&[]` to start
371    /// with no subscriptions and add some later via
372    /// [`Subscription::subscribe`] / [`Subscription::psubscribe`].
373    pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
374        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
375        if !channels.is_empty() {
376            sub.subscribe(channels);
377        }
378        sub
379    }
380
381    /// Convenience: open a [`Subscription`] starting on pattern subscriptions.
382    pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
383        let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
384        if !patterns.is_empty() {
385            sub.psubscribe(patterns);
386        }
387        sub
388    }
389}
390
391// ─────────────────────────────────────────────────────────────────────────
392// Shared list/set push + list pop helpers. `&Store` so we can lock + AOF-log.
393// ─────────────────────────────────────────────────────────────────────────
394
395fn push_helper<F>(
396    s: &Store,
397    key: &[u8],
398    values: &[&[u8]],
399    verb: &'static [u8],
400    op: F,
401) -> io::Result<usize>
402where
403    F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
404{
405    let mut g = s.wshard(key);
406    let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
407    let n = op(&mut g.store, key, &owned).map_err(store_err)?;
408    let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
409    parts.push(verb);
410    parts.push(key);
411    for v in values {
412        parts.push(v);
413    }
414    commit_write(&mut g, &parts)?;
415    Ok(n)
416}
417
418fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
419    let mut g = s.wshard(key);
420    let popped = if from_tail {
421        g.store.rpop(key, count).map_err(store_err)?
422    } else {
423        g.store.lpop(key, count).map_err(store_err)?
424    };
425    if !popped.is_empty() {
426        let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
427        let count_str = popped.len().to_string();
428        let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
429        commit_write(&mut g, &parts)?;
430    }
431    Ok(popped)
432}