Skip to main content

kevy_client/
lib.rs

1//! kevy-client — unified KV facade so downstream code can switch between
2//! in-process embedded and TCP-server backends with one URL string.
3//!
4//! ```no_run
5//! use kevy_client::Connection;
6//!
7//! // Same business code regardless of backend:
8//! let mut conn = Connection::open(std::env::var("MY_KEVY_URL").unwrap().as_str())?;
9//! conn.set(b"hello", b"world")?;
10//! assert_eq!(conn.get(b"hello")?, Some(b"world".to_vec()));
11//! # Ok::<(), std::io::Error>(())
12//! ```
13//!
14//! URL schemes:
15//! - `mem://`                       — in-process embedded, in-memory only
16//! - `mem://<name>`                 — shared in-process bus keyed by `<name>`
17//! - `file:///abs/path` /
18//!   `file://./rel/path`            — in-process embedded with persistence
19//! - `kevy://host[:port][/db]`      — TCP RESP, kevy-native scheme
20//! - `redis://host[:port][/db]`     — TCP RESP, standard Redis URL (alias)
21//! - `tcp://host[:port]`            — TCP RESP, raw (no SELECT round-trip)
22//!
23//! Auth (`redis://user:pass@…`) and TLS (`rediss://`) are rejected up front
24//! — kevy ships without either. v1.1.0 added the full string/hash/list/set/
25//! zset + one-shot `PUBLISH` surface. v1.2.0 added the pub/sub *consumer*
26//! side as a separate [`Subscriber`] type — a subscribed connection cannot
27//! send normal commands, so it needs its own socket and lives outside the
28//! `Connection` enum. v1.3.0 routes `mem://<name>` / `file:///path` through
29//! a process-local registry so the publisher and consumer can find each
30//! other when both opens use the same URL. The trait-vs-enum design
31//! decision is enum for now (closed two-backend universe); see ROADMAP
32//! for the trait extension path.
33
34#![forbid(unsafe_code)]
35#![warn(missing_docs)]
36
37use std::io;
38use std::time::Duration;
39
40use kevy_embedded::Store;
41use kevy_resp::Reply;
42use kevy_resp_client::RespClient;
43
44mod collections;
45mod scan;
46mod subscribe;
47mod subscribe_io;
48mod transaction;
49mod url;
50
51pub use subscribe::{PubsubEvent, Subscriber, SubscriberEvents, SubscriberMessages};
52pub use transaction::{Transaction, TransactionReplies};
53
54pub(crate) use url::{Target, parse_url, resolve_store};
55
56/// One open connection to a kevy backend, opaque about whether the backend
57/// is in-process or over TCP.
58pub enum Connection {
59    /// In-process [`kevy_embedded::Store`].
60    Embedded(Store),
61    /// TCP [`kevy_resp_client::RespClient`].
62    Remote(RespClient),
63}
64
65impl Connection {
66    /// Open a backend chosen by URL scheme.
67    ///
68    /// See the crate-level docs for the supported URL forms. From v1.3.0,
69    /// two `Connection::open` calls with the same `mem://<name>` or
70    /// `file:///path` URL share the same backing `Store` — and the same
71    /// pub/sub bus, so `Connection::publish` reaches a `Subscriber::open`
72    /// opened with the same URL.
73    pub fn open(url: &str) -> io::Result<Self> {
74        let parsed = parse_url(url)?;
75        match parsed {
76            Target::Remote(remote_url) => Ok(Self::Remote(RespClient::from_url(&remote_url)?)),
77            embed => Ok(Self::Embedded(resolve_store(&embed)?)),
78        }
79    }
80
81    /// `PING`. Returns `()` on `+PONG`, propagates any IO or RESP error.
82    /// The first thing every healthcheck calls.
83    pub fn ping(&mut self) -> io::Result<()> {
84        match self {
85            Self::Embedded(_) => Ok(()),
86            Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
87                Reply::Simple(s) if s == b"PONG" => Ok(()),
88                Reply::Error(e) => Err(io::Error::other(string(e))),
89                other => Err(unexpected(other)),
90            },
91        }
92    }
93
94    /// `SET key value`. Unconditional set (no NX/XX). Returns `()` on success.
95    pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
96        match self {
97            Self::Embedded(s) => s.set(key, value).map(|_| ()),
98            Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
99                Reply::Simple(s) if s == b"OK" => Ok(()),
100                Reply::Error(e) => Err(io::Error::other(string(e))),
101                other => Err(unexpected(other)),
102            },
103        }
104    }
105
106    /// `GET key`. `None` if absent or expired.
107    pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
108        match self {
109            Self::Embedded(s) => s.get(key),
110            Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
111                Reply::Bulk(v) => Ok(Some(v)),
112                Reply::Nil => Ok(None),
113                Reply::Error(e) => Err(io::Error::other(string(e))),
114                other => Err(unexpected(other)),
115            },
116        }
117    }
118
119    /// `DEL key [key ...]`. Returns the count of keys that were actually
120    /// removed (existing + dropped). Missing keys don't contribute.
121    pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
122        match self {
123            Self::Embedded(s) => s.del(keys),
124            Self::Remote(c) => {
125                let mut args = Vec::with_capacity(keys.len() + 1);
126                args.push(b"DEL".to_vec());
127                args.extend(keys.iter().map(|k| k.to_vec()));
128                match c.request(&args)? {
129                    Reply::Int(n) if n >= 0 => Ok(n as usize),
130                    Reply::Error(e) => Err(io::Error::other(string(e))),
131                    other => Err(unexpected(other)),
132                }
133            }
134        }
135    }
136
137    /// `EXISTS key [key ...]`. Count of keys present (a single key can
138    /// contribute >1 if passed multiple times, matching Redis semantics).
139    pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
140        match self {
141            Self::Embedded(s) => s.exists(keys),
142            Self::Remote(c) => {
143                let mut args = Vec::with_capacity(keys.len() + 1);
144                args.push(b"EXISTS".to_vec());
145                args.extend(keys.iter().map(|k| k.to_vec()));
146                match c.request(&args)? {
147                    Reply::Int(n) if n >= 0 => Ok(n as usize),
148                    Reply::Error(e) => Err(io::Error::other(string(e))),
149                    other => Err(unexpected(other)),
150                }
151            }
152        }
153    }
154
155    /// `INCR key`. Returns the post-increment value. Errors on non-integer
156    /// stored value.
157    pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
158        match self {
159            Self::Embedded(s) => s.incr(key),
160            Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
161                Reply::Int(n) => Ok(n),
162                Reply::Error(e) => Err(io::Error::other(string(e))),
163                other => Err(unexpected(other)),
164            },
165        }
166    }
167
168    /// `INCRBY key delta`. Negative delta is `DECRBY`. Returns post-value.
169    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
170        match self {
171            Self::Embedded(s) => s.incr_by(key, delta),
172            Self::Remote(c) => {
173                let args = vec![
174                    b"INCRBY".to_vec(),
175                    key.to_vec(),
176                    delta.to_string().into_bytes(),
177                ];
178                match c.request(&args)? {
179                    Reply::Int(n) => Ok(n),
180                    Reply::Error(e) => Err(io::Error::other(string(e))),
181                    other => Err(unexpected(other)),
182                }
183            }
184        }
185    }
186
187    /// `PEXPIRE key ttl_ms`. Returns whether the key existed and got a TTL.
188    pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
189        match self {
190            Self::Embedded(s) => s.expire(key, ttl),
191            Self::Remote(c) => {
192                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
193                let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
194                match c.request(&args)? {
195                    Reply::Int(1) => Ok(true),
196                    Reply::Int(0) => Ok(false),
197                    Reply::Error(e) => Err(io::Error::other(string(e))),
198                    other => Err(unexpected(other)),
199                }
200            }
201        }
202    }
203
204    /// `PERSIST key`. Returns whether a TTL was actually removed.
205    pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
206        match self {
207            Self::Embedded(s) => s.persist(key),
208            Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
209                Reply::Int(1) => Ok(true),
210                Reply::Int(0) => Ok(false),
211                Reply::Error(e) => Err(io::Error::other(string(e))),
212                other => Err(unexpected(other)),
213            },
214        }
215    }
216
217    /// `PTTL key`. Returns ms remaining, -2 if no key, -1 if key has no TTL.
218    pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
219        match self {
220            Self::Embedded(s) => Ok(s.ttl_ms(key)),
221            Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
222                Reply::Int(n) => Ok(n),
223                Reply::Error(e) => Err(io::Error::other(string(e))),
224                other => Err(unexpected(other)),
225            },
226        }
227    }
228
229    /// `TYPE key`. Returns the value's type as a Redis-style string (e.g.
230    /// `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"` if
231    /// the key doesn't exist).
232    pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
233        match self {
234            Self::Embedded(s) => Ok(s.type_of(key).to_string()),
235            Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
236                Reply::Simple(s) => Ok(string(s)),
237                Reply::Error(e) => Err(io::Error::other(string(e))),
238                other => Err(unexpected(other)),
239            },
240        }
241    }
242
243    /// `DBSIZE`. Total live keys at the time of the call.
244    pub fn dbsize(&mut self) -> io::Result<usize> {
245        match self {
246            Self::Embedded(s) => Ok(s.dbsize()),
247            Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
248                Reply::Int(n) if n >= 0 => Ok(n as usize),
249                Reply::Error(e) => Err(io::Error::other(string(e))),
250                other => Err(unexpected(other)),
251            },
252        }
253    }
254
255    /// `FLUSHDB`. Drops every key. Persistence remains opted-in; embedded
256    /// `with_persist` will rewrite the AOF on its next sync cycle.
257    pub fn flush(&mut self) -> io::Result<()> {
258        match self {
259            Self::Embedded(s) => s.flush(),
260            Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
261                Reply::Simple(s) if s == b"OK" => Ok(()),
262                Reply::Error(e) => Err(io::Error::other(string(e))),
263                other => Err(unexpected(other)),
264            },
265        }
266    }
267
268    /// `SET key value PX ttl_ms`. Convenience for the common
269    /// "cache with expiry" pattern; equivalent to `set` + `expire` but
270    /// atomic.
271    pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
272        match self {
273            Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
274            Self::Remote(c) => {
275                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
276                let args = vec![
277                    b"SET".to_vec(),
278                    key.to_vec(),
279                    value.to_vec(),
280                    b"PX".to_vec(),
281                    ms.to_string().into_bytes(),
282                ];
283                match c.request(&args)? {
284                    Reply::Simple(s) if s == b"OK" => Ok(()),
285                    Reply::Error(e) => Err(io::Error::other(string(e))),
286                    other => Err(unexpected(other)),
287                }
288            }
289        }
290    }
291
292    /// `MGET key [key ...]` — one reply per key, `None` for missing /
293    /// wrong-type. Returns in the same order as `keys`.
294    pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
295        match self {
296            Self::Embedded(s) => keys.iter().map(|k| s.get(k)).collect(),
297            Self::Remote(c) => {
298                let mut args = Vec::with_capacity(keys.len() + 1);
299                args.push(b"MGET".to_vec());
300                args.extend(keys.iter().map(|k| k.to_vec()));
301                match c.request(&args)? {
302                    Reply::Array(items) => items
303                        .into_iter()
304                        .map(|r| match r {
305                            Reply::Bulk(v) => Ok(Some(v)),
306                            Reply::Nil => Ok(None),
307                            other => Err(unexpected(other)),
308                        })
309                        .collect(),
310                    Reply::Error(e) => Err(io::Error::other(string(e))),
311                    other => Err(unexpected(other)),
312                }
313            }
314        }
315    }
316
317    /// `MSET key value [key value ...]` — set every pair atomically.
318    pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
319        match self {
320            Self::Embedded(s) => {
321                for (k, v) in pairs {
322                    s.set(k, v)?;
323                }
324                Ok(())
325            }
326            Self::Remote(c) => {
327                let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
328                args.push(b"MSET".to_vec());
329                for (k, v) in pairs {
330                    args.push(k.to_vec());
331                    args.push(v.to_vec());
332                }
333                match c.request(&args)? {
334                    Reply::Simple(s) if s == b"OK" => Ok(()),
335                    Reply::Error(e) => Err(io::Error::other(string(e))),
336                    other => Err(unexpected(other)),
337                }
338            }
339        }
340    }
341
342    /// `PUBLISH channel message`. Returns the count of subscribers
343    /// that received the message.
344    ///
345    /// As of v1.3.0, the embedded backend has a real in-process pub/sub
346    /// bus: when a [`Subscriber`] is open against the same `mem://<name>`
347    /// or `file:///path` URL, this delivers there and returns the actual
348    /// receiver count. Anonymous `mem://` keeps the old "no subscribers,
349    /// returns 0" behaviour (the URL is its own bus, by design).
350    ///
351    /// The pub/sub *consumer* side lives in [`Subscriber`]. On the remote
352    /// backend a subscribed TCP connection cannot send normal commands
353    /// per the RESP spec; the embedded backend has no such restriction
354    /// but `Subscriber` is still a distinct type for API symmetry.
355    pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
356        match self {
357            Self::Embedded(s) => Ok(s.publish(channel, message)),
358            Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
359                Reply::Int(n) if n >= 0 => Ok(n as usize),
360                Reply::Error(e) => Err(io::Error::other(string(e))),
361                other => Err(unexpected(other)),
362            },
363        }
364    }
365}
366
367// ─────────────────────────────────────────────────────────────────────────
368// Crate-internal helpers, used here + by `collections.rs` + `subscribe.rs`.
369// ─────────────────────────────────────────────────────────────────────────
370
371pub(crate) fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
372    vec![verb.to_vec(), a.to_vec()]
373}
374
375pub(crate) fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
376    vec![verb.to_vec(), a.to_vec(), b.to_vec()]
377}
378
379pub(crate) fn string(b: Vec<u8>) -> String {
380    String::from_utf8_lossy(&b).into_owned()
381}
382
383pub(crate) fn unexpected(r: Reply) -> io::Error {
384    let kind = match r {
385        Reply::Simple(_) => "simple-string",
386        Reply::Error(_) => "error",
387        Reply::Int(_) => "integer",
388        Reply::Bulk(_) => "bulk-string",
389        Reply::Nil | Reply::Null => "nil",
390        Reply::Array(_) => "array",
391        Reply::Map(_) => "map",
392        Reply::Set(_) => "set",
393        Reply::Double(_) => "double",
394        Reply::Boolean(_) => "boolean",
395        Reply::Verbatim { .. } => "verbatim-string",
396        Reply::BigNumber(_) => "big-number",
397        Reply::Push(_) => "push",
398        Reply::BlobError(_) => "blob-error",
399    };
400    io::Error::other(format!("unexpected RESP reply variant: {kind}"))
401}
402
403pub(crate) fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
404    items
405        .into_iter()
406        .map(|r| match r {
407            Reply::Bulk(v) => Ok(v),
408            Reply::Simple(v) => Ok(v),
409            Reply::Nil => Ok(Vec::new()),
410            other => Err(unexpected(other)),
411        })
412        .collect()
413}
414
415pub(crate) fn store_err(e: kevy_embedded::StoreError) -> io::Error {
416    io::Error::other(format!("kevy-store: {e:?}"))
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    /// Smoke against the embedded backend: every generic + string method
424    /// delegating to `Store`. Per-collection coverage (hash/list/set/zset)
425    /// lives in `collections::tests`.
426    #[test]
427    fn embedded_mem_full_crud_round_trip() {
428        let mut c = Connection::open("mem://").unwrap();
429        c.ping().unwrap();
430
431        c.set(b"k", b"v").unwrap();
432        assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
433
434        assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
435        assert_eq!(c.get(b"k").unwrap(), None);
436
437        c.set(b"a", b"1").unwrap();
438        c.set(b"b", b"2").unwrap();
439        assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
440
441        assert_eq!(c.incr(b"counter").unwrap(), 1);
442        assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
443
444        c.set(b"timed", b"x").unwrap();
445        assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
446        let ttl = c.ttl_ms(b"timed").unwrap();
447        assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
448        assert!(c.persist(b"timed").unwrap());
449        assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
450
451        assert_eq!(c.type_of(b"none").unwrap(), "none");
452        assert_eq!(c.type_of(b"timed").unwrap(), "string");
453
454        assert!(c.dbsize().unwrap() >= 3);
455        c.flush().unwrap();
456        assert_eq!(c.dbsize().unwrap(), 0);
457
458        c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60))
459            .unwrap();
460        let ttl = c.ttl_ms(b"timed2").unwrap();
461        assert!((0..=60_000).contains(&ttl));
462    }
463
464    #[test]
465    fn anonymous_mem_publish_returns_zero() {
466        // No bus, no subscribers — by design.
467        let mut c = Connection::open("mem://").unwrap();
468        assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
469    }
470
471    #[test]
472    fn embedded_mget_mset() {
473        let mut c = Connection::open("mem://").unwrap();
474        c.mset(&[
475            (b"a".as_ref(), b"1".as_ref()),
476            (b"b".as_ref(), b"2".as_ref()),
477        ])
478        .unwrap();
479        let got = c.mget(&[&b"a"[..], &b"b"[..], &b"missing"[..]]).unwrap();
480        assert_eq!(
481            got,
482            vec![Some(b"1".to_vec()), Some(b"2".to_vec()), None]
483        );
484    }
485
486    #[test]
487    fn embedded_multi_rejected_unsupported() {
488        let mut c = Connection::open("mem://").unwrap();
489        let err = c.multi().unwrap_err();
490        assert_eq!(err.kind(), io::ErrorKind::Unsupported);
491    }
492}