Skip to main content

kevy_client/
lib.rs

1//! kevy-client — unified KV facade so downstream code can switch between
2//! in-process embedded and TCP-server backends with one URL string.
3//!
4//! ```no_run
5//! use kevy_client::Connection;
6//!
7//! // Same business code regardless of backend:
8//! let mut conn = Connection::open(std::env::var("MY_KEVY_URL").unwrap().as_str())?;
9//! conn.set(b"hello", b"world")?;
10//! assert_eq!(conn.get(b"hello")?, Some(b"world".to_vec()));
11//! # Ok::<(), std::io::Error>(())
12//! ```
13//!
14//! URL schemes:
15//! - `mem://`                       — in-process embedded, in-memory only
16//! - `mem://<name>`                 — shared in-process bus keyed by `<name>`
17//! - `file:///abs/path` /
18//!   `file://./rel/path`            — in-process embedded with persistence
19//! - `kevy://host[:port][/db]`      — TCP RESP, kevy-native scheme
20//! - `redis://host[:port][/db]`     — TCP RESP, standard Redis URL (alias)
21//! - `tcp://host[:port]`            — TCP RESP, raw (no SELECT round-trip)
22//!
23//! Auth (`redis://user:pass@…`) and TLS (`rediss://`) are rejected up front
24//! — kevy ships without either. v1.1.0 added the full string/hash/list/set/
25//! zset + one-shot `PUBLISH` surface. v1.2.0 added the pub/sub *consumer*
26//! side as a separate [`Subscriber`] type — a subscribed connection cannot
27//! send normal commands, so it needs its own socket and lives outside the
28//! `Connection` enum. v1.3.0 routes `mem://<name>` / `file:///path` through
29//! a process-local registry so the publisher and consumer can find each
30//! other when both opens use the same URL. The trait-vs-enum design
31//! decision is enum for now (closed two-backend universe); see ROADMAP
32//! for the trait extension path.
33
34#![forbid(unsafe_code)]
35#![warn(missing_docs)]
36
37use std::io;
38use std::time::Duration;
39
40use kevy_embedded::Store;
41use kevy_resp::Reply;
42use kevy_resp_client::RespClient;
43
44mod collections;
45mod scan;
46mod subscribe;
47mod transaction;
48mod url;
49
50pub use subscribe::{PubsubEvent, Subscriber};
51pub use transaction::Transaction;
52
53pub(crate) use url::{Target, parse_url, resolve_store};
54
55/// One open connection to a kevy backend, opaque about whether the backend
56/// is in-process or over TCP.
57pub enum Connection {
58    /// In-process [`kevy_embedded::Store`].
59    Embedded(Store),
60    /// TCP [`kevy_resp_client::RespClient`].
61    Remote(RespClient),
62}
63
64impl Connection {
65    /// Open a backend chosen by URL scheme.
66    ///
67    /// See the crate-level docs for the supported URL forms. From v1.3.0,
68    /// two `Connection::open` calls with the same `mem://<name>` or
69    /// `file:///path` URL share the same backing `Store` — and the same
70    /// pub/sub bus, so `Connection::publish` reaches a `Subscriber::open`
71    /// opened with the same URL.
72    pub fn open(url: &str) -> io::Result<Self> {
73        let parsed = parse_url(url)?;
74        match parsed {
75            Target::Remote(remote_url) => Ok(Self::Remote(RespClient::from_url(&remote_url)?)),
76            embed => Ok(Self::Embedded(resolve_store(&embed)?)),
77        }
78    }
79
80    /// `PING`. Returns `()` on `+PONG`, propagates any IO or RESP error.
81    /// The first thing every healthcheck calls.
82    pub fn ping(&mut self) -> io::Result<()> {
83        match self {
84            Self::Embedded(_) => Ok(()),
85            Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
86                Reply::Simple(s) if s == b"PONG" => Ok(()),
87                Reply::Error(e) => Err(io::Error::other(string(e))),
88                other => Err(unexpected(other)),
89            },
90        }
91    }
92
93    /// `SET key value`. Unconditional set (no NX/XX). Returns `()` on success.
94    pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
95        match self {
96            Self::Embedded(s) => s.set(key, value).map(|_| ()),
97            Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
98                Reply::Simple(s) if s == b"OK" => Ok(()),
99                Reply::Error(e) => Err(io::Error::other(string(e))),
100                other => Err(unexpected(other)),
101            },
102        }
103    }
104
105    /// `GET key`. `None` if absent or expired.
106    pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
107        match self {
108            Self::Embedded(s) => s.get(key),
109            Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
110                Reply::Bulk(v) => Ok(Some(v)),
111                Reply::Nil => Ok(None),
112                Reply::Error(e) => Err(io::Error::other(string(e))),
113                other => Err(unexpected(other)),
114            },
115        }
116    }
117
118    /// `DEL key [key ...]`. Returns the count of keys that were actually
119    /// removed (existing + dropped). Missing keys don't contribute.
120    pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
121        match self {
122            Self::Embedded(s) => s.del(keys),
123            Self::Remote(c) => {
124                let mut args = Vec::with_capacity(keys.len() + 1);
125                args.push(b"DEL".to_vec());
126                args.extend(keys.iter().map(|k| k.to_vec()));
127                match c.request(&args)? {
128                    Reply::Int(n) if n >= 0 => Ok(n as usize),
129                    Reply::Error(e) => Err(io::Error::other(string(e))),
130                    other => Err(unexpected(other)),
131                }
132            }
133        }
134    }
135
136    /// `EXISTS key [key ...]`. Count of keys present (a single key can
137    /// contribute >1 if passed multiple times, matching Redis semantics).
138    pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
139        match self {
140            Self::Embedded(s) => s.exists(keys),
141            Self::Remote(c) => {
142                let mut args = Vec::with_capacity(keys.len() + 1);
143                args.push(b"EXISTS".to_vec());
144                args.extend(keys.iter().map(|k| k.to_vec()));
145                match c.request(&args)? {
146                    Reply::Int(n) if n >= 0 => Ok(n as usize),
147                    Reply::Error(e) => Err(io::Error::other(string(e))),
148                    other => Err(unexpected(other)),
149                }
150            }
151        }
152    }
153
154    /// `INCR key`. Returns the post-increment value. Errors on non-integer
155    /// stored value.
156    pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
157        match self {
158            Self::Embedded(s) => s.incr(key),
159            Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
160                Reply::Int(n) => Ok(n),
161                Reply::Error(e) => Err(io::Error::other(string(e))),
162                other => Err(unexpected(other)),
163            },
164        }
165    }
166
167    /// `INCRBY key delta`. Negative delta is `DECRBY`. Returns post-value.
168    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
169        match self {
170            Self::Embedded(s) => s.incr_by(key, delta),
171            Self::Remote(c) => {
172                let args = vec![
173                    b"INCRBY".to_vec(),
174                    key.to_vec(),
175                    delta.to_string().into_bytes(),
176                ];
177                match c.request(&args)? {
178                    Reply::Int(n) => Ok(n),
179                    Reply::Error(e) => Err(io::Error::other(string(e))),
180                    other => Err(unexpected(other)),
181                }
182            }
183        }
184    }
185
186    /// `PEXPIRE key ttl_ms`. Returns whether the key existed and got a TTL.
187    pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
188        match self {
189            Self::Embedded(s) => s.expire(key, ttl),
190            Self::Remote(c) => {
191                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
192                let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
193                match c.request(&args)? {
194                    Reply::Int(1) => Ok(true),
195                    Reply::Int(0) => Ok(false),
196                    Reply::Error(e) => Err(io::Error::other(string(e))),
197                    other => Err(unexpected(other)),
198                }
199            }
200        }
201    }
202
203    /// `PERSIST key`. Returns whether a TTL was actually removed.
204    pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
205        match self {
206            Self::Embedded(s) => s.persist(key),
207            Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
208                Reply::Int(1) => Ok(true),
209                Reply::Int(0) => Ok(false),
210                Reply::Error(e) => Err(io::Error::other(string(e))),
211                other => Err(unexpected(other)),
212            },
213        }
214    }
215
216    /// `PTTL key`. Returns ms remaining, -2 if no key, -1 if key has no TTL.
217    pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
218        match self {
219            Self::Embedded(s) => Ok(s.ttl_ms(key)),
220            Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
221                Reply::Int(n) => Ok(n),
222                Reply::Error(e) => Err(io::Error::other(string(e))),
223                other => Err(unexpected(other)),
224            },
225        }
226    }
227
228    /// `TYPE key`. Returns the value's type as a Redis-style string (e.g.
229    /// `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"` if
230    /// the key doesn't exist).
231    pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
232        match self {
233            Self::Embedded(s) => Ok(s.type_of(key).to_string()),
234            Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
235                Reply::Simple(s) => Ok(string(s)),
236                Reply::Error(e) => Err(io::Error::other(string(e))),
237                other => Err(unexpected(other)),
238            },
239        }
240    }
241
242    /// `DBSIZE`. Total live keys at the time of the call.
243    pub fn dbsize(&mut self) -> io::Result<usize> {
244        match self {
245            Self::Embedded(s) => Ok(s.dbsize()),
246            Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
247                Reply::Int(n) if n >= 0 => Ok(n as usize),
248                Reply::Error(e) => Err(io::Error::other(string(e))),
249                other => Err(unexpected(other)),
250            },
251        }
252    }
253
254    /// `FLUSHDB`. Drops every key. Persistence remains opted-in; embedded
255    /// `with_persist` will rewrite the AOF on its next sync cycle.
256    pub fn flush(&mut self) -> io::Result<()> {
257        match self {
258            Self::Embedded(s) => s.flush(),
259            Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
260                Reply::Simple(s) if s == b"OK" => Ok(()),
261                Reply::Error(e) => Err(io::Error::other(string(e))),
262                other => Err(unexpected(other)),
263            },
264        }
265    }
266
267    /// `SET key value PX ttl_ms`. Convenience for the common
268    /// "cache with expiry" pattern; equivalent to `set` + `expire` but
269    /// atomic.
270    pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
271        match self {
272            Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
273            Self::Remote(c) => {
274                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
275                let args = vec![
276                    b"SET".to_vec(),
277                    key.to_vec(),
278                    value.to_vec(),
279                    b"PX".to_vec(),
280                    ms.to_string().into_bytes(),
281                ];
282                match c.request(&args)? {
283                    Reply::Simple(s) if s == b"OK" => Ok(()),
284                    Reply::Error(e) => Err(io::Error::other(string(e))),
285                    other => Err(unexpected(other)),
286                }
287            }
288        }
289    }
290
291    /// `MGET key [key ...]` — one reply per key, `None` for missing /
292    /// wrong-type. Returns in the same order as `keys`.
293    pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
294        match self {
295            Self::Embedded(s) => keys.iter().map(|k| s.get(k)).collect(),
296            Self::Remote(c) => {
297                let mut args = Vec::with_capacity(keys.len() + 1);
298                args.push(b"MGET".to_vec());
299                args.extend(keys.iter().map(|k| k.to_vec()));
300                match c.request(&args)? {
301                    Reply::Array(items) => items
302                        .into_iter()
303                        .map(|r| match r {
304                            Reply::Bulk(v) => Ok(Some(v)),
305                            Reply::Nil => Ok(None),
306                            other => Err(unexpected(other)),
307                        })
308                        .collect(),
309                    Reply::Error(e) => Err(io::Error::other(string(e))),
310                    other => Err(unexpected(other)),
311                }
312            }
313        }
314    }
315
316    /// `MSET key value [key value ...]` — set every pair atomically.
317    pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
318        match self {
319            Self::Embedded(s) => {
320                for (k, v) in pairs {
321                    s.set(k, v)?;
322                }
323                Ok(())
324            }
325            Self::Remote(c) => {
326                let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
327                args.push(b"MSET".to_vec());
328                for (k, v) in pairs {
329                    args.push(k.to_vec());
330                    args.push(v.to_vec());
331                }
332                match c.request(&args)? {
333                    Reply::Simple(s) if s == b"OK" => Ok(()),
334                    Reply::Error(e) => Err(io::Error::other(string(e))),
335                    other => Err(unexpected(other)),
336                }
337            }
338        }
339    }
340
341    /// `PUBLISH channel message`. Returns the count of subscribers
342    /// that received the message.
343    ///
344    /// As of v1.3.0, the embedded backend has a real in-process pub/sub
345    /// bus: when a [`Subscriber`] is open against the same `mem://<name>`
346    /// or `file:///path` URL, this delivers there and returns the actual
347    /// receiver count. Anonymous `mem://` keeps the old "no subscribers,
348    /// returns 0" behaviour (the URL is its own bus, by design).
349    ///
350    /// The pub/sub *consumer* side lives in [`Subscriber`]. On the remote
351    /// backend a subscribed TCP connection cannot send normal commands
352    /// per the RESP spec; the embedded backend has no such restriction
353    /// but `Subscriber` is still a distinct type for API symmetry.
354    pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
355        match self {
356            Self::Embedded(s) => Ok(s.publish(channel, message)),
357            Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
358                Reply::Int(n) if n >= 0 => Ok(n as usize),
359                Reply::Error(e) => Err(io::Error::other(string(e))),
360                other => Err(unexpected(other)),
361            },
362        }
363    }
364}
365
366// ─────────────────────────────────────────────────────────────────────────
367// Crate-internal helpers, used here + by `collections.rs` + `subscribe.rs`.
368// ─────────────────────────────────────────────────────────────────────────
369
370pub(crate) fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
371    vec![verb.to_vec(), a.to_vec()]
372}
373
374pub(crate) fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
375    vec![verb.to_vec(), a.to_vec(), b.to_vec()]
376}
377
378pub(crate) fn string(b: Vec<u8>) -> String {
379    String::from_utf8_lossy(&b).into_owned()
380}
381
382pub(crate) fn unexpected(r: Reply) -> io::Error {
383    let kind = match r {
384        Reply::Simple(_) => "simple-string",
385        Reply::Error(_) => "error",
386        Reply::Int(_) => "integer",
387        Reply::Bulk(_) => "bulk-string",
388        Reply::Nil => "nil",
389        Reply::Array(_) => "array",
390    };
391    io::Error::other(format!("unexpected RESP reply variant: {kind}"))
392}
393
394pub(crate) fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
395    items
396        .into_iter()
397        .map(|r| match r {
398            Reply::Bulk(v) => Ok(v),
399            Reply::Simple(v) => Ok(v),
400            Reply::Nil => Ok(Vec::new()),
401            other => Err(unexpected(other)),
402        })
403        .collect()
404}
405
406pub(crate) fn store_err(e: kevy_embedded::StoreError) -> io::Error {
407    io::Error::other(format!("kevy-store: {e:?}"))
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    /// Smoke against the embedded backend: every generic + string method
415    /// delegating to `Store`. Per-collection coverage (hash/list/set/zset)
416    /// lives in `collections::tests`.
417    #[test]
418    fn embedded_mem_full_crud_round_trip() {
419        let mut c = Connection::open("mem://").unwrap();
420        c.ping().unwrap();
421
422        c.set(b"k", b"v").unwrap();
423        assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
424
425        assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
426        assert_eq!(c.get(b"k").unwrap(), None);
427
428        c.set(b"a", b"1").unwrap();
429        c.set(b"b", b"2").unwrap();
430        assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
431
432        assert_eq!(c.incr(b"counter").unwrap(), 1);
433        assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
434
435        c.set(b"timed", b"x").unwrap();
436        assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
437        let ttl = c.ttl_ms(b"timed").unwrap();
438        assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
439        assert!(c.persist(b"timed").unwrap());
440        assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
441
442        assert_eq!(c.type_of(b"none").unwrap(), "none");
443        assert_eq!(c.type_of(b"timed").unwrap(), "string");
444
445        assert!(c.dbsize().unwrap() >= 3);
446        c.flush().unwrap();
447        assert_eq!(c.dbsize().unwrap(), 0);
448
449        c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60))
450            .unwrap();
451        let ttl = c.ttl_ms(b"timed2").unwrap();
452        assert!((0..=60_000).contains(&ttl));
453    }
454
455    #[test]
456    fn anonymous_mem_publish_returns_zero() {
457        // No bus, no subscribers — by design.
458        let mut c = Connection::open("mem://").unwrap();
459        assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
460    }
461
462    #[test]
463    fn embedded_mget_mset() {
464        let mut c = Connection::open("mem://").unwrap();
465        c.mset(&[
466            (b"a".as_ref(), b"1".as_ref()),
467            (b"b".as_ref(), b"2".as_ref()),
468        ])
469        .unwrap();
470        let got = c.mget(&[&b"a"[..], &b"b"[..], &b"missing"[..]]).unwrap();
471        assert_eq!(
472            got,
473            vec![Some(b"1".to_vec()), Some(b"2".to_vec()), None]
474        );
475    }
476
477    #[test]
478    fn embedded_multi_rejected_unsupported() {
479        let mut c = Connection::open("mem://").unwrap();
480        let err = c.multi().unwrap_err();
481        assert_eq!(err.kind(), io::ErrorKind::Unsupported);
482    }
483}