Skip to main content

kevy_client/
lib.rs

1//! kevy-client — unified KV facade so downstream code can switch between
2//! in-process embedded and TCP-server backends with one URL string.
3//!
4//! ```no_run
5//! use kevy_client::Connection;
6//!
7//! // Same business code regardless of backend:
8//! let mut conn = Connection::open(std::env::var("MY_KEVY_URL").unwrap().as_str())?;
9//! conn.set(b"hello", b"world")?;
10//! assert_eq!(conn.get(b"hello")?, Some(b"world".to_vec()));
11//! # Ok::<(), std::io::Error>(())
12//! ```
13//!
14//! URL schemes:
15//! - `mem://`                       — in-process embedded, in-memory only
16//! - `file:///abs/path` /
17//!   `file://./rel/path`            — in-process embedded with persistence
18//! - `kevy://host[:port][/db]`      — TCP RESP, kevy-native scheme
19//! - `redis://host[:port][/db]`     — TCP RESP, standard Redis URL (alias)
20//! - `tcp://host[:port]`            — TCP RESP, raw (no SELECT round-trip)
21//!
22//! Auth (`redis://user:pass@…`) and TLS (`rediss://`) are rejected up front
23//! — kevy ships without either. v1.1.0 added the full string/hash/list/set/
24//! zset + one-shot `PUBLISH` surface. v1.2.0 added the pub/sub *consumer*
25//! side as a separate [`Subscriber`] type — a subscribed connection cannot
26//! send normal commands, so it needs its own socket and lives outside the
27//! `Connection` enum. The trait-vs-enum design decision is enum for now
28//! (closed two-backend universe); see ROADMAP for the trait extension path.
29
30#![forbid(unsafe_code)]
31#![warn(missing_docs)]
32
33use std::io;
34use std::path::PathBuf;
35use std::time::Duration;
36
37use kevy_embedded::{Config, Store};
38use kevy_resp::Reply;
39use kevy_resp_client::RespClient;
40
41mod subscribe;
42pub use subscribe::{PubsubEvent, Subscriber};
43
44/// One open connection to a kevy backend, opaque about whether the backend
45/// is in-process or over TCP.
46pub enum Connection {
47    /// In-process [`kevy_embedded::Store`].
48    Embedded(Store),
49    /// TCP [`kevy_resp_client::RespClient`].
50    Remote(RespClient),
51}
52
53impl Connection {
54    /// Open a backend chosen by URL scheme.
55    ///
56    /// See the crate-level docs for the supported URL forms.
57    pub fn open(url: &str) -> io::Result<Self> {
58        let parsed = parse_url(url)?;
59        match parsed {
60            Target::EmbedMemory => Ok(Self::Embedded(Store::open(Config::default())?)),
61            Target::EmbedPersist(path) => Ok(Self::Embedded(Store::open(
62                Config::default().with_persist(path),
63            )?)),
64            // RespClient::from_url already speaks kevy:// + redis:// + tcp://
65            // (added in v1.0.3) — delegate to it for the network targets so
66            // the URL parser stays in one place.
67            Target::Remote(url) => Ok(Self::Remote(RespClient::from_url(&url)?)),
68        }
69    }
70
71    /// `PING`. Returns `()` on `+PONG`, propagates any IO or RESP error.
72    /// The first thing every healthcheck calls.
73    pub fn ping(&mut self) -> io::Result<()> {
74        match self {
75            Self::Embedded(_) => Ok(()), // embedded is alive iff this method is called
76            Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
77                Reply::Simple(s) if s == b"PONG" => Ok(()),
78                Reply::Error(e) => Err(io::Error::other(string(e))),
79                other => Err(unexpected(other)),
80            },
81        }
82    }
83
84    /// `SET key value`. Unconditional set (no NX/XX). Returns `()` on success.
85    pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
86        match self {
87            Self::Embedded(s) => s.set(key, value).map(|_| ()),
88            Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
89                Reply::Simple(s) if s == b"OK" => Ok(()),
90                Reply::Error(e) => Err(io::Error::other(string(e))),
91                other => Err(unexpected(other)),
92            },
93        }
94    }
95
96    /// `GET key`. `None` if absent or expired.
97    pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
98        match self {
99            Self::Embedded(s) => s.get(key),
100            Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
101                Reply::Bulk(v) => Ok(Some(v)),
102                Reply::Nil => Ok(None),
103                Reply::Error(e) => Err(io::Error::other(string(e))),
104                other => Err(unexpected(other)),
105            },
106        }
107    }
108
109    /// `DEL key [key ...]`. Returns the count of keys that were actually
110    /// removed (existing + dropped). Missing keys don't contribute.
111    pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
112        match self {
113            Self::Embedded(s) => s.del(keys),
114            Self::Remote(c) => {
115                let mut args = Vec::with_capacity(keys.len() + 1);
116                args.push(b"DEL".to_vec());
117                args.extend(keys.iter().map(|k| k.to_vec()));
118                match c.request(&args)? {
119                    Reply::Int(n) if n >= 0 => Ok(n as usize),
120                    Reply::Error(e) => Err(io::Error::other(string(e))),
121                    other => Err(unexpected(other)),
122                }
123            }
124        }
125    }
126
127    /// `EXISTS key [key ...]`. Count of keys present (a single key can
128    /// contribute >1 if passed multiple times, matching Redis semantics).
129    pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
130        match self {
131            Self::Embedded(s) => s.exists(keys),
132            Self::Remote(c) => {
133                let mut args = Vec::with_capacity(keys.len() + 1);
134                args.push(b"EXISTS".to_vec());
135                args.extend(keys.iter().map(|k| k.to_vec()));
136                match c.request(&args)? {
137                    Reply::Int(n) if n >= 0 => Ok(n as usize),
138                    Reply::Error(e) => Err(io::Error::other(string(e))),
139                    other => Err(unexpected(other)),
140                }
141            }
142        }
143    }
144
145    /// `INCR key`. Returns the post-increment value. Errors on non-integer
146    /// stored value.
147    pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
148        match self {
149            Self::Embedded(s) => s.incr(key),
150            Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
151                Reply::Int(n) => Ok(n),
152                Reply::Error(e) => Err(io::Error::other(string(e))),
153                other => Err(unexpected(other)),
154            },
155        }
156    }
157
158    /// `INCRBY key delta`. Negative delta is `DECRBY`. Returns post-value.
159    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
160        match self {
161            Self::Embedded(s) => s.incr_by(key, delta),
162            Self::Remote(c) => {
163                let args = vec![
164                    b"INCRBY".to_vec(),
165                    key.to_vec(),
166                    delta.to_string().into_bytes(),
167                ];
168                match c.request(&args)? {
169                    Reply::Int(n) => Ok(n),
170                    Reply::Error(e) => Err(io::Error::other(string(e))),
171                    other => Err(unexpected(other)),
172                }
173            }
174        }
175    }
176
177    /// `PEXPIRE key ttl_ms`. Returns whether the key existed and got a TTL.
178    pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
179        match self {
180            Self::Embedded(s) => s.expire(key, ttl),
181            Self::Remote(c) => {
182                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
183                let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
184                match c.request(&args)? {
185                    Reply::Int(1) => Ok(true),
186                    Reply::Int(0) => Ok(false),
187                    Reply::Error(e) => Err(io::Error::other(string(e))),
188                    other => Err(unexpected(other)),
189                }
190            }
191        }
192    }
193
194    /// `PERSIST key`. Returns whether a TTL was actually removed.
195    pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
196        match self {
197            Self::Embedded(s) => s.persist(key),
198            Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
199                Reply::Int(1) => Ok(true),
200                Reply::Int(0) => Ok(false),
201                Reply::Error(e) => Err(io::Error::other(string(e))),
202                other => Err(unexpected(other)),
203            },
204        }
205    }
206
207    /// `PTTL key`. Returns ms remaining, -2 if no key, -1 if key has no TTL.
208    pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
209        match self {
210            Self::Embedded(s) => Ok(s.ttl_ms(key)),
211            Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
212                Reply::Int(n) => Ok(n),
213                Reply::Error(e) => Err(io::Error::other(string(e))),
214                other => Err(unexpected(other)),
215            },
216        }
217    }
218
219    /// `TYPE key`. Returns the value's type as a Redis-style string (e.g.
220    /// `"string"`, `"hash"`, `"list"`, `"set"`, `"zset"`, or `"none"` if
221    /// the key doesn't exist).
222    pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
223        match self {
224            Self::Embedded(s) => Ok(s.type_of(key).to_string()),
225            Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
226                Reply::Simple(s) => Ok(string(s)),
227                Reply::Error(e) => Err(io::Error::other(string(e))),
228                other => Err(unexpected(other)),
229            },
230        }
231    }
232
233    /// `DBSIZE`. Total live keys at the time of the call.
234    pub fn dbsize(&mut self) -> io::Result<usize> {
235        match self {
236            Self::Embedded(s) => Ok(s.dbsize()),
237            Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
238                Reply::Int(n) if n >= 0 => Ok(n as usize),
239                Reply::Error(e) => Err(io::Error::other(string(e))),
240                other => Err(unexpected(other)),
241            },
242        }
243    }
244
245    /// `FLUSHDB`. Drops every key. Persistence remains opted-in; embedded
246    /// `with_persist` will rewrite the AOF on its next sync cycle.
247    pub fn flush(&mut self) -> io::Result<()> {
248        match self {
249            Self::Embedded(s) => s.flush(),
250            Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
251                Reply::Simple(s) if s == b"OK" => Ok(()),
252                Reply::Error(e) => Err(io::Error::other(string(e))),
253                other => Err(unexpected(other)),
254            },
255        }
256    }
257
258    /// `SET key value PX ttl_ms`. Convenience for the common
259    /// "cache with expiry" pattern; equivalent to `set` + `expire` but
260    /// atomic.
261    pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
262        match self {
263            Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
264            Self::Remote(c) => {
265                let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
266                let args = vec![
267                    b"SET".to_vec(),
268                    key.to_vec(),
269                    value.to_vec(),
270                    b"PX".to_vec(),
271                    ms.to_string().into_bytes(),
272                ];
273                match c.request(&args)? {
274                    Reply::Simple(s) if s == b"OK" => Ok(()),
275                    Reply::Error(e) => Err(io::Error::other(string(e))),
276                    other => Err(unexpected(other)),
277                }
278            }
279        }
280    }
281
282    // ===== Hash =====
283
284    /// `HSET key field value [field value ...]`. Returns the number of
285    /// fields that were newly added (not overwrites).
286    pub fn hset(&mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
287        match self {
288            Self::Embedded(s) => s.hset(key, pairs),
289            Self::Remote(c) => {
290                let mut args = Vec::with_capacity(2 + pairs.len() * 2);
291                args.push(b"HSET".to_vec());
292                args.push(key.to_vec());
293                for (f, v) in pairs {
294                    args.push(f.to_vec());
295                    args.push(v.to_vec());
296                }
297                match c.request(&args)? {
298                    Reply::Int(n) if n >= 0 => Ok(n as usize),
299                    Reply::Error(e) => Err(io::Error::other(string(e))),
300                    other => Err(unexpected(other)),
301                }
302            }
303        }
304    }
305
306    /// `HGET key field`. `None` when the key or field is absent.
307    pub fn hget(&mut self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
308        match self {
309            Self::Embedded(s) => s.hget(key, field),
310            Self::Remote(c) => match c.request(&vec3(b"HGET", key, field))? {
311                Reply::Bulk(v) => Ok(Some(v)),
312                Reply::Nil => Ok(None),
313                Reply::Error(e) => Err(io::Error::other(string(e))),
314                other => Err(unexpected(other)),
315            },
316        }
317    }
318
319    /// `HDEL key field [field ...]`. Returns the number of fields actually
320    /// removed.
321    pub fn hdel(&mut self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
322        match self {
323            Self::Embedded(s) => s.hdel(key, fields),
324            Self::Remote(c) => {
325                let mut args = Vec::with_capacity(fields.len() + 2);
326                args.push(b"HDEL".to_vec());
327                args.push(key.to_vec());
328                args.extend(fields.iter().map(|f| f.to_vec()));
329                match c.request(&args)? {
330                    Reply::Int(n) if n >= 0 => Ok(n as usize),
331                    Reply::Error(e) => Err(io::Error::other(string(e))),
332                    other => Err(unexpected(other)),
333                }
334            }
335        }
336    }
337
338    /// `HLEN key`. Number of fields in the hash (0 if absent).
339    pub fn hlen(&mut self, key: &[u8]) -> io::Result<usize> {
340        match self {
341            Self::Embedded(s) => s.with(|inner| inner.hlen(key)).map_err(store_err),
342            Self::Remote(c) => match c.request(&vec2(b"HLEN", key))? {
343                Reply::Int(n) if n >= 0 => Ok(n as usize),
344                Reply::Error(e) => Err(io::Error::other(string(e))),
345                other => Err(unexpected(other)),
346            },
347        }
348    }
349
350    /// `HGETALL key`. Returns a flat `[f0, v0, f1, v1, ...]` matching the
351    /// Redis wire shape; empty when the key is absent.
352    pub fn hgetall(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
353        match self {
354            Self::Embedded(s) => s.with(|inner| inner.hgetall(key)).map_err(store_err),
355            Self::Remote(c) => match c.request(&vec2(b"HGETALL", key))? {
356                Reply::Array(items) => array_to_bulks(items),
357                Reply::Error(e) => Err(io::Error::other(string(e))),
358                other => Err(unexpected(other)),
359            },
360        }
361    }
362
363    /// `HKEYS key`. Returns the hash's field names (empty if absent).
364    pub fn hkeys(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
365        match self {
366            Self::Embedded(s) => s.with(|inner| inner.hkeys(key)).map_err(store_err),
367            Self::Remote(c) => match c.request(&vec2(b"HKEYS", key))? {
368                Reply::Array(items) => array_to_bulks(items),
369                Reply::Error(e) => Err(io::Error::other(string(e))),
370                other => Err(unexpected(other)),
371            },
372        }
373    }
374
375    /// `HVALS key`. Returns the hash's values (empty if absent).
376    pub fn hvals(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
377        match self {
378            Self::Embedded(s) => s.with(|inner| inner.hvals(key)).map_err(store_err),
379            Self::Remote(c) => match c.request(&vec2(b"HVALS", key))? {
380                Reply::Array(items) => array_to_bulks(items),
381                Reply::Error(e) => Err(io::Error::other(string(e))),
382                other => Err(unexpected(other)),
383            },
384        }
385    }
386
387    // ===== List =====
388
389    /// `LPUSH key value [value ...]`. Returns the new list length.
390    pub fn lpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
391        match self {
392            Self::Embedded(s) => s.lpush(key, values),
393            Self::Remote(c) => list_push(c, b"LPUSH", key, values),
394        }
395    }
396
397    /// `RPUSH key value [value ...]`. Returns the new list length.
398    pub fn rpush(&mut self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
399        match self {
400            Self::Embedded(s) => s.rpush(key, values),
401            Self::Remote(c) => list_push(c, b"RPUSH", key, values),
402        }
403    }
404
405    /// `LPOP key count`. Returns up to `count` values from the head; empty
406    /// when the key is absent or already drained.
407    pub fn lpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
408        match self {
409            Self::Embedded(s) => s.lpop(key, count),
410            Self::Remote(c) => list_pop(c, b"LPOP", key, count),
411        }
412    }
413
414    /// `RPOP key count`. Symmetric to [`Self::lpop`] from the tail.
415    pub fn rpop(&mut self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
416        match self {
417            Self::Embedded(s) => s.rpop(key, count),
418            Self::Remote(c) => list_pop(c, b"RPOP", key, count),
419        }
420    }
421
422    /// `LLEN key`. 0 when the key is absent.
423    pub fn llen(&mut self, key: &[u8]) -> io::Result<usize> {
424        match self {
425            Self::Embedded(s) => s.llen(key),
426            Self::Remote(c) => match c.request(&vec2(b"LLEN", key))? {
427                Reply::Int(n) if n >= 0 => Ok(n as usize),
428                Reply::Error(e) => Err(io::Error::other(string(e))),
429                other => Err(unexpected(other)),
430            },
431        }
432    }
433
434    /// `LRANGE key start stop`. Redis-style indexing — negative offsets
435    /// count from the tail (`-1` = last element).
436    pub fn lrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
437        match self {
438            Self::Embedded(s) => s
439                .with(|inner| inner.lrange(key, start, stop))
440                .map_err(store_err),
441            Self::Remote(c) => {
442                let args = vec![
443                    b"LRANGE".to_vec(),
444                    key.to_vec(),
445                    start.to_string().into_bytes(),
446                    stop.to_string().into_bytes(),
447                ];
448                match c.request(&args)? {
449                    Reply::Array(items) => array_to_bulks(items),
450                    Reply::Error(e) => Err(io::Error::other(string(e))),
451                    other => Err(unexpected(other)),
452                }
453            }
454        }
455    }
456
457    // ===== Set =====
458
459    /// `SADD key member [member ...]`. Returns count of newly added members.
460    pub fn sadd(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
461        match self {
462            Self::Embedded(s) => s.sadd(key, members),
463            Self::Remote(c) => set_multi(c, b"SADD", key, members),
464        }
465    }
466
467    /// `SREM key member [member ...]`. Returns count of removed members.
468    pub fn srem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
469        match self {
470            Self::Embedded(s) => s.srem(key, members),
471            Self::Remote(c) => set_multi(c, b"SREM", key, members),
472        }
473    }
474
475    /// `SMEMBERS key`. Order is implementation-defined; empty if absent.
476    pub fn smembers(&mut self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
477        match self {
478            Self::Embedded(s) => s.smembers(key),
479            Self::Remote(c) => match c.request(&vec2(b"SMEMBERS", key))? {
480                Reply::Array(items) => array_to_bulks(items),
481                Reply::Error(e) => Err(io::Error::other(string(e))),
482                other => Err(unexpected(other)),
483            },
484        }
485    }
486
487    /// `SCARD key`. 0 when the key is absent.
488    pub fn scard(&mut self, key: &[u8]) -> io::Result<usize> {
489        match self {
490            Self::Embedded(s) => s.scard(key),
491            Self::Remote(c) => match c.request(&vec2(b"SCARD", key))? {
492                Reply::Int(n) if n >= 0 => Ok(n as usize),
493                Reply::Error(e) => Err(io::Error::other(string(e))),
494                other => Err(unexpected(other)),
495            },
496        }
497    }
498
499    /// `SISMEMBER key member`. `false` when key or member absent.
500    pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> io::Result<bool> {
501        match self {
502            Self::Embedded(s) => s
503                .with(|inner| inner.sismember(key, member))
504                .map_err(store_err),
505            Self::Remote(c) => match c.request(&vec3(b"SISMEMBER", key, member))? {
506                Reply::Int(1) => Ok(true),
507                Reply::Int(0) => Ok(false),
508                Reply::Error(e) => Err(io::Error::other(string(e))),
509                other => Err(unexpected(other)),
510            },
511        }
512    }
513
514    // ===== Sorted set =====
515
516    /// `ZADD key score member [score member ...]`. Returns count of newly
517    /// added members (overwrites don't count).
518    pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
519        match self {
520            Self::Embedded(s) => s.zadd(key, pairs),
521            Self::Remote(c) => {
522                let mut args = Vec::with_capacity(2 + pairs.len() * 2);
523                args.push(b"ZADD".to_vec());
524                args.push(key.to_vec());
525                for (score, m) in pairs {
526                    args.push(score.to_string().into_bytes());
527                    args.push(m.to_vec());
528                }
529                match c.request(&args)? {
530                    Reply::Int(n) if n >= 0 => Ok(n as usize),
531                    Reply::Error(e) => Err(io::Error::other(string(e))),
532                    other => Err(unexpected(other)),
533                }
534            }
535        }
536    }
537
538    /// `ZREM key member [member ...]`. Returns count of removed members.
539    pub fn zrem(&mut self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
540        match self {
541            Self::Embedded(s) => s.zrem(key, members),
542            Self::Remote(c) => set_multi(c, b"ZREM", key, members),
543        }
544    }
545
546    /// `ZSCORE key member`. `None` if absent.
547    pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
548        match self {
549            Self::Embedded(s) => s.zscore(key, member),
550            Self::Remote(c) => match c.request(&vec3(b"ZSCORE", key, member))? {
551                Reply::Bulk(v) => {
552                    let s = std::str::from_utf8(&v)
553                        .map_err(|_| io::Error::other("non-utf8 score reply"))?;
554                    let n: f64 = s
555                        .parse()
556                        .map_err(|_| io::Error::other(format!("bad score float: {s}")))?;
557                    Ok(Some(n))
558                }
559                Reply::Nil => Ok(None),
560                Reply::Error(e) => Err(io::Error::other(string(e))),
561                other => Err(unexpected(other)),
562            },
563        }
564    }
565
566    /// `ZCARD key`. Number of members; 0 if absent.
567    pub fn zcard(&mut self, key: &[u8]) -> io::Result<usize> {
568        match self {
569            Self::Embedded(s) => s.zcard(key),
570            Self::Remote(c) => match c.request(&vec2(b"ZCARD", key))? {
571                Reply::Int(n) if n >= 0 => Ok(n as usize),
572                Reply::Error(e) => Err(io::Error::other(string(e))),
573                other => Err(unexpected(other)),
574            },
575        }
576    }
577
578    /// `ZRANGE key start stop`. Ascending-score order; negative indices
579    /// count from the tail.
580    pub fn zrange(&mut self, key: &[u8], start: i64, stop: i64) -> io::Result<Vec<Vec<u8>>> {
581        match self {
582            Self::Embedded(s) => s
583                .with(|inner| inner.zrange(key, start, stop))
584                .map(|pairs| pairs.into_iter().map(|(m, _score)| m).collect())
585                .map_err(store_err),
586            Self::Remote(c) => {
587                let args = vec![
588                    b"ZRANGE".to_vec(),
589                    key.to_vec(),
590                    start.to_string().into_bytes(),
591                    stop.to_string().into_bytes(),
592                ];
593                match c.request(&args)? {
594                    Reply::Array(items) => array_to_bulks(items),
595                    Reply::Error(e) => Err(io::Error::other(string(e))),
596                    other => Err(unexpected(other)),
597                }
598            }
599        }
600    }
601
602    // ===== Pub/sub =====
603
604    /// `PUBLISH channel message`. Returns the count of subscribers
605    /// that received the message.
606    ///
607    /// On the embedded backend there are no subscribers (single process,
608    /// no reactor), so this returns `Ok(0)` — matching Redis's
609    /// "publish to a channel with no listeners" semantic.
610    ///
611    /// The pub/sub *consumer* side lives in [`Subscriber`], which takes
612    /// its own dedicated TCP connection: a subscribed connection cannot
613    /// send normal commands per the Redis/RESP spec, so it can't share
614    /// a socket with this `Connection`.
615    pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
616        match self {
617            Self::Embedded(_) => Ok(0),
618            Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
619                Reply::Int(n) if n >= 0 => Ok(n as usize),
620                Reply::Error(e) => Err(io::Error::other(string(e))),
621                other => Err(unexpected(other)),
622            },
623        }
624    }
625}
626
627// Helpers for the multi-arg list / set commands — both backends accept a
628// slice of byte-slices, but the RESP path builds the args vector itself.
629
630fn list_push(
631    c: &mut RespClient,
632    verb: &[u8],
633    key: &[u8],
634    values: &[&[u8]],
635) -> io::Result<usize> {
636    let mut args = Vec::with_capacity(values.len() + 2);
637    args.push(verb.to_vec());
638    args.push(key.to_vec());
639    args.extend(values.iter().map(|v| v.to_vec()));
640    match c.request(&args)? {
641        Reply::Int(n) if n >= 0 => Ok(n as usize),
642        Reply::Error(e) => Err(io::Error::other(string(e))),
643        other => Err(unexpected(other)),
644    }
645}
646
647fn list_pop(
648    c: &mut RespClient,
649    verb: &[u8],
650    key: &[u8],
651    count: usize,
652) -> io::Result<Vec<Vec<u8>>> {
653    let args = vec![verb.to_vec(), key.to_vec(), count.to_string().into_bytes()];
654    match c.request(&args)? {
655        Reply::Array(items) => array_to_bulks(items),
656        Reply::Bulk(v) => Ok(vec![v]),
657        Reply::Nil => Ok(Vec::new()),
658        Reply::Error(e) => Err(io::Error::other(string(e))),
659        other => Err(unexpected(other)),
660    }
661}
662
663fn set_multi(
664    c: &mut RespClient,
665    verb: &[u8],
666    key: &[u8],
667    members: &[&[u8]],
668) -> io::Result<usize> {
669    let mut args = Vec::with_capacity(members.len() + 2);
670    args.push(verb.to_vec());
671    args.push(key.to_vec());
672    args.extend(members.iter().map(|m| m.to_vec()));
673    match c.request(&args)? {
674        Reply::Int(n) if n >= 0 => Ok(n as usize),
675        Reply::Error(e) => Err(io::Error::other(string(e))),
676        other => Err(unexpected(other)),
677    }
678}
679
680fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
681    items
682        .into_iter()
683        .map(|r| match r {
684            Reply::Bulk(v) => Ok(v),
685            Reply::Simple(v) => Ok(v),
686            Reply::Nil => Ok(Vec::new()),
687            other => Err(unexpected(other)),
688        })
689        .collect()
690}
691
692fn store_err(e: kevy_embedded::StoreError) -> io::Error {
693    io::Error::other(format!("kevy-store: {e:?}"))
694}
695
696// =====================================================================
697// URL parsing
698// =====================================================================
699
700/// What `parse_url` resolves an input to.
701#[derive(Debug)]
702enum Target {
703    /// `mem://` — in-process, in-memory only.
704    EmbedMemory,
705    /// `file://path` — in-process with persistence in `path`.
706    EmbedPersist(PathBuf),
707    /// `kevy://…` / `redis://…` / `tcp://…` — delegate to RespClient.
708    Remote(String),
709}
710
711fn parse_url(url: &str) -> io::Result<Target> {
712    let (scheme, rest) = url
713        .split_once("://")
714        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'"))?;
715    match scheme {
716        "mem" => {
717            if !rest.is_empty() {
718                return Err(io::Error::new(
719                    io::ErrorKind::InvalidInput,
720                    "mem:// URL must be empty after the scheme (e.g. `mem://`)",
721                ));
722            }
723            Ok(Target::EmbedMemory)
724        }
725        "file" => {
726            // file:///abs/path → "/abs/path"; file://./rel → "./rel".
727            // The triple-slash form is the standard file:// URI for an
728            // absolute path; we treat any leading `/` as part of the path.
729            let path = rest;
730            if path.is_empty() {
731                return Err(io::Error::new(
732                    io::ErrorKind::InvalidInput,
733                    "file:// URL must include a path (e.g. `file:///var/lib/myapp`)",
734                ));
735            }
736            Ok(Target::EmbedPersist(PathBuf::from(path)))
737        }
738        "kevy" | "redis" | "tcp" => Ok(Target::Remote(url.to_string())),
739        "rediss" | "kevys" => Err(io::Error::new(
740            io::ErrorKind::Unsupported,
741            "TLS schemes (rediss://, kevys://) are unsupported — kevy has no TLS",
742        )),
743        other => Err(io::Error::new(
744            io::ErrorKind::InvalidInput,
745            format!("unknown URL scheme '{other}://'"),
746        )),
747    }
748}
749
750// =====================================================================
751// Small RESP helpers
752// =====================================================================
753
754fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
755    vec![verb.to_vec(), a.to_vec()]
756}
757
758fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
759    vec![verb.to_vec(), a.to_vec(), b.to_vec()]
760}
761
762fn string(b: Vec<u8>) -> String {
763    String::from_utf8_lossy(&b).into_owned()
764}
765
766fn unexpected(r: Reply) -> io::Error {
767    let kind = match r {
768        Reply::Simple(_) => "simple-string",
769        Reply::Error(_) => "error",
770        Reply::Int(_) => "integer",
771        Reply::Bulk(_) => "bulk-string",
772        Reply::Nil => "nil",
773        Reply::Array(_) => "array",
774    };
775    io::Error::other(format!("unexpected RESP reply variant: {kind}"))
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    #[test]
783    fn parse_mem_url() {
784        assert!(matches!(parse_url("mem://").unwrap(), Target::EmbedMemory));
785        assert!(parse_url("mem://something").is_err());
786    }
787
788    #[test]
789    fn parse_file_url() {
790        match parse_url("file:///var/lib/myapp").unwrap() {
791            Target::EmbedPersist(p) => assert_eq!(p, PathBuf::from("/var/lib/myapp")),
792            _ => panic!("wrong variant"),
793        }
794        match parse_url("file://./data").unwrap() {
795            Target::EmbedPersist(p) => assert_eq!(p, PathBuf::from("./data")),
796            _ => panic!("wrong variant"),
797        }
798        assert!(parse_url("file://").is_err());
799    }
800
801    #[test]
802    fn parse_remote_urls_delegate() {
803        for url in ["kevy://h:6379", "redis://h:6379/0", "tcp://h:6379"] {
804            match parse_url(url).unwrap() {
805                Target::Remote(u) => assert_eq!(u, url),
806                _ => panic!("wrong variant"),
807            }
808        }
809    }
810
811    #[test]
812    fn parse_tls_rejected() {
813        assert_eq!(
814            parse_url("rediss://h:6379").unwrap_err().kind(),
815            io::ErrorKind::Unsupported
816        );
817    }
818
819    #[test]
820    fn parse_unknown_scheme_rejected() {
821        assert_eq!(
822            parse_url("memcached://h:11211").unwrap_err().kind(),
823            io::ErrorKind::InvalidInput
824        );
825    }
826
827    // Functional smoke against the embedded backend — covers every method
828    // delegating to Store. (Remote backend smoke needs a running server;
829    // see crates/kevy/tests/ for that integration in the next pass.)
830    #[test]
831    fn embedded_mem_full_crud_round_trip() {
832        let mut c = Connection::open("mem://").unwrap();
833        c.ping().unwrap();
834
835        c.set(b"k", b"v").unwrap();
836        assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
837
838        // del returns 1 (existing), 0 (missing).
839        assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
840        assert_eq!(c.get(b"k").unwrap(), None);
841
842        // exists counts each present.
843        c.set(b"a", b"1").unwrap();
844        c.set(b"b", b"2").unwrap();
845        assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
846
847        // incr / incr_by — fresh counter starts at 0.
848        assert_eq!(c.incr(b"counter").unwrap(), 1);
849        assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
850
851        // expire + ttl_ms + persist.
852        c.set(b"timed", b"x").unwrap();
853        assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
854        let ttl = c.ttl_ms(b"timed").unwrap();
855        assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
856        assert!(c.persist(b"timed").unwrap());
857        // After persist, no expiry → ttl_ms is -1.
858        assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
859
860        // type_of for absent + present.
861        assert_eq!(c.type_of(b"none").unwrap(), "none");
862        assert_eq!(c.type_of(b"timed").unwrap(), "string");
863
864        // dbsize / flush.
865        assert!(c.dbsize().unwrap() >= 3);
866        c.flush().unwrap();
867        assert_eq!(c.dbsize().unwrap(), 0);
868
869        // set_with_ttl — same as set+expire but atomic.
870        c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60)).unwrap();
871        let ttl = c.ttl_ms(b"timed2").unwrap();
872        assert!((0..=60_000).contains(&ttl));
873    }
874
875    #[test]
876    fn embedded_hash_methods() {
877        let mut c = Connection::open("mem://").unwrap();
878        let pairs: &[(&[u8], &[u8])] = &[
879            (b"name".as_ref(), b"alice".as_ref()),
880            (b"age".as_ref(), b"30".as_ref()),
881        ];
882        assert_eq!(c.hset(b"u:1", pairs).unwrap(), 2);
883        assert_eq!(c.hget(b"u:1", b"name").unwrap(), Some(b"alice".to_vec()));
884        assert_eq!(c.hget(b"u:1", b"missing").unwrap(), None);
885        assert_eq!(c.hlen(b"u:1").unwrap(), 2);
886
887        // hgetall returns flat [f0,v0,f1,v1,...] — sort to make assertion stable.
888        let mut all = c.hgetall(b"u:1").unwrap();
889        all.sort();
890        assert!(all.contains(&b"alice".to_vec()));
891        assert!(all.contains(&b"name".to_vec()));
892
893        let mut keys = c.hkeys(b"u:1").unwrap();
894        keys.sort();
895        assert_eq!(keys, vec![b"age".to_vec(), b"name".to_vec()]);
896
897        let mut vals = c.hvals(b"u:1").unwrap();
898        vals.sort();
899        assert_eq!(vals, vec![b"30".to_vec(), b"alice".to_vec()]);
900
901        assert_eq!(c.hdel(b"u:1", &[&b"age"[..], &b"missing"[..]]).unwrap(), 1);
902        assert_eq!(c.hlen(b"u:1").unwrap(), 1);
903    }
904
905    #[test]
906    fn embedded_list_methods() {
907        let mut c = Connection::open("mem://").unwrap();
908        assert_eq!(c.rpush(b"q", &[&b"a"[..], &b"b"[..], &b"c"[..]]).unwrap(), 3);
909        assert_eq!(c.lpush(b"q", &[&b"z"[..]]).unwrap(), 4);
910        assert_eq!(c.llen(b"q").unwrap(), 4);
911
912        // q = [z, a, b, c]
913        assert_eq!(
914            c.lrange(b"q", 0, -1).unwrap(),
915            vec![b"z".to_vec(), b"a".to_vec(), b"b".to_vec(), b"c".to_vec()]
916        );
917
918        assert_eq!(c.lpop(b"q", 2).unwrap(), vec![b"z".to_vec(), b"a".to_vec()]);
919        assert_eq!(c.rpop(b"q", 1).unwrap(), vec![b"c".to_vec()]);
920        assert_eq!(c.llen(b"q").unwrap(), 1);
921    }
922
923    #[test]
924    fn embedded_set_methods() {
925        let mut c = Connection::open("mem://").unwrap();
926        assert_eq!(
927            c.sadd(b"s", &[&b"a"[..], &b"b"[..], &b"a"[..]]).unwrap(),
928            2
929        ); // dedupe
930        assert_eq!(c.scard(b"s").unwrap(), 2);
931        assert!(c.sismember(b"s", b"a").unwrap());
932        assert!(!c.sismember(b"s", b"missing").unwrap());
933
934        let mut m = c.smembers(b"s").unwrap();
935        m.sort();
936        assert_eq!(m, vec![b"a".to_vec(), b"b".to_vec()]);
937
938        assert_eq!(c.srem(b"s", &[&b"a"[..]]).unwrap(), 1);
939        assert_eq!(c.scard(b"s").unwrap(), 1);
940    }
941
942    #[test]
943    fn embedded_zset_methods() {
944        let mut c = Connection::open("mem://").unwrap();
945        let pairs: &[(f64, &[u8])] = &[
946            (100.0, b"alice".as_ref()),
947            (200.0, b"bob".as_ref()),
948            (50.0, b"carol".as_ref()),
949        ];
950        assert_eq!(c.zadd(b"lb", pairs).unwrap(), 3);
951        assert_eq!(c.zscore(b"lb", b"bob").unwrap(), Some(200.0));
952        assert_eq!(c.zscore(b"lb", b"none").unwrap(), None);
953        assert_eq!(c.zcard(b"lb").unwrap(), 3);
954
955        // ZRANGE 0 -1 → ascending by score: carol, alice, bob.
956        let r = c.zrange(b"lb", 0, -1).unwrap();
957        assert_eq!(
958            r,
959            vec![b"carol".to_vec(), b"alice".to_vec(), b"bob".to_vec()]
960        );
961
962        assert_eq!(c.zrem(b"lb", &[&b"carol"[..]]).unwrap(), 1);
963        assert_eq!(c.zcard(b"lb").unwrap(), 2);
964    }
965
966    #[test]
967    fn embedded_publish_returns_zero() {
968        // Single-process embed has no subscribers — semantic match for
969        // "PUBLISH to a channel nobody listens to".
970        let mut c = Connection::open("mem://").unwrap();
971        assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
972    }
973}