1#![forbid(unsafe_code)]
35#![warn(missing_docs)]
36
37use std::io;
38use std::time::Duration;
39
40use kevy_embedded::Store;
41use kevy_resp::Reply;
42use kevy_resp_client::RespClient;
43
44mod collections;
45mod scan;
46mod subscribe;
47mod transaction;
48mod url;
49
50pub use subscribe::{PubsubEvent, Subscriber};
51pub use transaction::Transaction;
52
53pub(crate) use url::{Target, parse_url, resolve_store};
54
55pub enum Connection {
58 Embedded(Store),
60 Remote(RespClient),
62}
63
64impl Connection {
65 pub fn open(url: &str) -> io::Result<Self> {
73 let parsed = parse_url(url)?;
74 match parsed {
75 Target::Remote(remote_url) => Ok(Self::Remote(RespClient::from_url(&remote_url)?)),
76 embed => Ok(Self::Embedded(resolve_store(&embed)?)),
77 }
78 }
79
80 pub fn ping(&mut self) -> io::Result<()> {
83 match self {
84 Self::Embedded(_) => Ok(()),
85 Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
86 Reply::Simple(s) if s == b"PONG" => Ok(()),
87 Reply::Error(e) => Err(io::Error::other(string(e))),
88 other => Err(unexpected(other)),
89 },
90 }
91 }
92
93 pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
95 match self {
96 Self::Embedded(s) => s.set(key, value).map(|_| ()),
97 Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
98 Reply::Simple(s) if s == b"OK" => Ok(()),
99 Reply::Error(e) => Err(io::Error::other(string(e))),
100 other => Err(unexpected(other)),
101 },
102 }
103 }
104
105 pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
107 match self {
108 Self::Embedded(s) => s.get(key),
109 Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
110 Reply::Bulk(v) => Ok(Some(v)),
111 Reply::Nil => Ok(None),
112 Reply::Error(e) => Err(io::Error::other(string(e))),
113 other => Err(unexpected(other)),
114 },
115 }
116 }
117
118 pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
121 match self {
122 Self::Embedded(s) => s.del(keys),
123 Self::Remote(c) => {
124 let mut args = Vec::with_capacity(keys.len() + 1);
125 args.push(b"DEL".to_vec());
126 args.extend(keys.iter().map(|k| k.to_vec()));
127 match c.request(&args)? {
128 Reply::Int(n) if n >= 0 => Ok(n as usize),
129 Reply::Error(e) => Err(io::Error::other(string(e))),
130 other => Err(unexpected(other)),
131 }
132 }
133 }
134 }
135
136 pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
139 match self {
140 Self::Embedded(s) => s.exists(keys),
141 Self::Remote(c) => {
142 let mut args = Vec::with_capacity(keys.len() + 1);
143 args.push(b"EXISTS".to_vec());
144 args.extend(keys.iter().map(|k| k.to_vec()));
145 match c.request(&args)? {
146 Reply::Int(n) if n >= 0 => Ok(n as usize),
147 Reply::Error(e) => Err(io::Error::other(string(e))),
148 other => Err(unexpected(other)),
149 }
150 }
151 }
152 }
153
154 pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
157 match self {
158 Self::Embedded(s) => s.incr(key),
159 Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
160 Reply::Int(n) => Ok(n),
161 Reply::Error(e) => Err(io::Error::other(string(e))),
162 other => Err(unexpected(other)),
163 },
164 }
165 }
166
167 pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
169 match self {
170 Self::Embedded(s) => s.incr_by(key, delta),
171 Self::Remote(c) => {
172 let args = vec![
173 b"INCRBY".to_vec(),
174 key.to_vec(),
175 delta.to_string().into_bytes(),
176 ];
177 match c.request(&args)? {
178 Reply::Int(n) => Ok(n),
179 Reply::Error(e) => Err(io::Error::other(string(e))),
180 other => Err(unexpected(other)),
181 }
182 }
183 }
184 }
185
186 pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
188 match self {
189 Self::Embedded(s) => s.expire(key, ttl),
190 Self::Remote(c) => {
191 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
192 let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
193 match c.request(&args)? {
194 Reply::Int(1) => Ok(true),
195 Reply::Int(0) => Ok(false),
196 Reply::Error(e) => Err(io::Error::other(string(e))),
197 other => Err(unexpected(other)),
198 }
199 }
200 }
201 }
202
203 pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
205 match self {
206 Self::Embedded(s) => s.persist(key),
207 Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
208 Reply::Int(1) => Ok(true),
209 Reply::Int(0) => Ok(false),
210 Reply::Error(e) => Err(io::Error::other(string(e))),
211 other => Err(unexpected(other)),
212 },
213 }
214 }
215
216 pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
218 match self {
219 Self::Embedded(s) => Ok(s.ttl_ms(key)),
220 Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
221 Reply::Int(n) => Ok(n),
222 Reply::Error(e) => Err(io::Error::other(string(e))),
223 other => Err(unexpected(other)),
224 },
225 }
226 }
227
228 pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
232 match self {
233 Self::Embedded(s) => Ok(s.type_of(key).to_string()),
234 Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
235 Reply::Simple(s) => Ok(string(s)),
236 Reply::Error(e) => Err(io::Error::other(string(e))),
237 other => Err(unexpected(other)),
238 },
239 }
240 }
241
242 pub fn dbsize(&mut self) -> io::Result<usize> {
244 match self {
245 Self::Embedded(s) => Ok(s.dbsize()),
246 Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
247 Reply::Int(n) if n >= 0 => Ok(n as usize),
248 Reply::Error(e) => Err(io::Error::other(string(e))),
249 other => Err(unexpected(other)),
250 },
251 }
252 }
253
254 pub fn flush(&mut self) -> io::Result<()> {
257 match self {
258 Self::Embedded(s) => s.flush(),
259 Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
260 Reply::Simple(s) if s == b"OK" => Ok(()),
261 Reply::Error(e) => Err(io::Error::other(string(e))),
262 other => Err(unexpected(other)),
263 },
264 }
265 }
266
267 pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
271 match self {
272 Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
273 Self::Remote(c) => {
274 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
275 let args = vec![
276 b"SET".to_vec(),
277 key.to_vec(),
278 value.to_vec(),
279 b"PX".to_vec(),
280 ms.to_string().into_bytes(),
281 ];
282 match c.request(&args)? {
283 Reply::Simple(s) if s == b"OK" => Ok(()),
284 Reply::Error(e) => Err(io::Error::other(string(e))),
285 other => Err(unexpected(other)),
286 }
287 }
288 }
289 }
290
291 pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
294 match self {
295 Self::Embedded(s) => keys.iter().map(|k| s.get(k)).collect(),
296 Self::Remote(c) => {
297 let mut args = Vec::with_capacity(keys.len() + 1);
298 args.push(b"MGET".to_vec());
299 args.extend(keys.iter().map(|k| k.to_vec()));
300 match c.request(&args)? {
301 Reply::Array(items) => items
302 .into_iter()
303 .map(|r| match r {
304 Reply::Bulk(v) => Ok(Some(v)),
305 Reply::Nil => Ok(None),
306 other => Err(unexpected(other)),
307 })
308 .collect(),
309 Reply::Error(e) => Err(io::Error::other(string(e))),
310 other => Err(unexpected(other)),
311 }
312 }
313 }
314 }
315
316 pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
318 match self {
319 Self::Embedded(s) => {
320 for (k, v) in pairs {
321 s.set(k, v)?;
322 }
323 Ok(())
324 }
325 Self::Remote(c) => {
326 let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
327 args.push(b"MSET".to_vec());
328 for (k, v) in pairs {
329 args.push(k.to_vec());
330 args.push(v.to_vec());
331 }
332 match c.request(&args)? {
333 Reply::Simple(s) if s == b"OK" => Ok(()),
334 Reply::Error(e) => Err(io::Error::other(string(e))),
335 other => Err(unexpected(other)),
336 }
337 }
338 }
339 }
340
341 pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
355 match self {
356 Self::Embedded(s) => Ok(s.publish(channel, message)),
357 Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
358 Reply::Int(n) if n >= 0 => Ok(n as usize),
359 Reply::Error(e) => Err(io::Error::other(string(e))),
360 other => Err(unexpected(other)),
361 },
362 }
363 }
364}
365
366pub(crate) fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
371 vec![verb.to_vec(), a.to_vec()]
372}
373
374pub(crate) fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
375 vec![verb.to_vec(), a.to_vec(), b.to_vec()]
376}
377
378pub(crate) fn string(b: Vec<u8>) -> String {
379 String::from_utf8_lossy(&b).into_owned()
380}
381
382pub(crate) fn unexpected(r: Reply) -> io::Error {
383 let kind = match r {
384 Reply::Simple(_) => "simple-string",
385 Reply::Error(_) => "error",
386 Reply::Int(_) => "integer",
387 Reply::Bulk(_) => "bulk-string",
388 Reply::Nil => "nil",
389 Reply::Array(_) => "array",
390 };
391 io::Error::other(format!("unexpected RESP reply variant: {kind}"))
392}
393
394pub(crate) fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
395 items
396 .into_iter()
397 .map(|r| match r {
398 Reply::Bulk(v) => Ok(v),
399 Reply::Simple(v) => Ok(v),
400 Reply::Nil => Ok(Vec::new()),
401 other => Err(unexpected(other)),
402 })
403 .collect()
404}
405
406pub(crate) fn store_err(e: kevy_embedded::StoreError) -> io::Error {
407 io::Error::other(format!("kevy-store: {e:?}"))
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
418 fn embedded_mem_full_crud_round_trip() {
419 let mut c = Connection::open("mem://").unwrap();
420 c.ping().unwrap();
421
422 c.set(b"k", b"v").unwrap();
423 assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
424
425 assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
426 assert_eq!(c.get(b"k").unwrap(), None);
427
428 c.set(b"a", b"1").unwrap();
429 c.set(b"b", b"2").unwrap();
430 assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
431
432 assert_eq!(c.incr(b"counter").unwrap(), 1);
433 assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
434
435 c.set(b"timed", b"x").unwrap();
436 assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
437 let ttl = c.ttl_ms(b"timed").unwrap();
438 assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
439 assert!(c.persist(b"timed").unwrap());
440 assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
441
442 assert_eq!(c.type_of(b"none").unwrap(), "none");
443 assert_eq!(c.type_of(b"timed").unwrap(), "string");
444
445 assert!(c.dbsize().unwrap() >= 3);
446 c.flush().unwrap();
447 assert_eq!(c.dbsize().unwrap(), 0);
448
449 c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60))
450 .unwrap();
451 let ttl = c.ttl_ms(b"timed2").unwrap();
452 assert!((0..=60_000).contains(&ttl));
453 }
454
455 #[test]
456 fn anonymous_mem_publish_returns_zero() {
457 let mut c = Connection::open("mem://").unwrap();
459 assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
460 }
461
462 #[test]
463 fn embedded_mget_mset() {
464 let mut c = Connection::open("mem://").unwrap();
465 c.mset(&[
466 (b"a".as_ref(), b"1".as_ref()),
467 (b"b".as_ref(), b"2".as_ref()),
468 ])
469 .unwrap();
470 let got = c.mget(&[&b"a"[..], &b"b"[..], &b"missing"[..]]).unwrap();
471 assert_eq!(
472 got,
473 vec![Some(b"1".to_vec()), Some(b"2".to_vec()), None]
474 );
475 }
476
477 #[test]
478 fn embedded_multi_rejected_unsupported() {
479 let mut c = Connection::open("mem://").unwrap();
480 let err = c.multi().unwrap_err();
481 assert_eq!(err.kind(), io::ErrorKind::Unsupported);
482 }
483}