1#![forbid(unsafe_code)]
35#![warn(missing_docs)]
36
37use std::io;
38use std::time::Duration;
39
40use kevy_embedded::Store;
41use kevy_resp::Reply;
42use kevy_resp_client::RespClient;
43
44mod collections;
45mod scan;
46mod subscribe;
47mod subscribe_io;
48mod transaction;
49mod url;
50
51pub use subscribe::{PubsubEvent, Subscriber, SubscriberEvents, SubscriberMessages};
52pub use transaction::{Transaction, TransactionReplies};
53
54pub(crate) use url::{Target, parse_url, resolve_store};
55
56pub enum Connection {
59 Embedded(Store),
61 Remote(RespClient),
63}
64
65impl Connection {
66 pub fn open(url: &str) -> io::Result<Self> {
74 let parsed = parse_url(url)?;
75 match parsed {
76 Target::Remote(remote_url) => Ok(Self::Remote(RespClient::from_url(&remote_url)?)),
77 embed => Ok(Self::Embedded(resolve_store(&embed)?)),
78 }
79 }
80
81 pub fn ping(&mut self) -> io::Result<()> {
84 match self {
85 Self::Embedded(_) => Ok(()),
86 Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
87 Reply::Simple(s) if s == b"PONG" => Ok(()),
88 Reply::Error(e) => Err(io::Error::other(string(e))),
89 other => Err(unexpected(other)),
90 },
91 }
92 }
93
94 pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
96 match self {
97 Self::Embedded(s) => s.set(key, value).map(|_| ()),
98 Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
99 Reply::Simple(s) if s == b"OK" => Ok(()),
100 Reply::Error(e) => Err(io::Error::other(string(e))),
101 other => Err(unexpected(other)),
102 },
103 }
104 }
105
106 pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
108 match self {
109 Self::Embedded(s) => s.get(key),
110 Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
111 Reply::Bulk(v) => Ok(Some(v)),
112 Reply::Nil => Ok(None),
113 Reply::Error(e) => Err(io::Error::other(string(e))),
114 other => Err(unexpected(other)),
115 },
116 }
117 }
118
119 pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
122 match self {
123 Self::Embedded(s) => s.del(keys),
124 Self::Remote(c) => {
125 let mut args = Vec::with_capacity(keys.len() + 1);
126 args.push(b"DEL".to_vec());
127 args.extend(keys.iter().map(|k| k.to_vec()));
128 match c.request(&args)? {
129 Reply::Int(n) if n >= 0 => Ok(n as usize),
130 Reply::Error(e) => Err(io::Error::other(string(e))),
131 other => Err(unexpected(other)),
132 }
133 }
134 }
135 }
136
137 pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
140 match self {
141 Self::Embedded(s) => s.exists(keys),
142 Self::Remote(c) => {
143 let mut args = Vec::with_capacity(keys.len() + 1);
144 args.push(b"EXISTS".to_vec());
145 args.extend(keys.iter().map(|k| k.to_vec()));
146 match c.request(&args)? {
147 Reply::Int(n) if n >= 0 => Ok(n as usize),
148 Reply::Error(e) => Err(io::Error::other(string(e))),
149 other => Err(unexpected(other)),
150 }
151 }
152 }
153 }
154
155 pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
158 match self {
159 Self::Embedded(s) => s.incr(key),
160 Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
161 Reply::Int(n) => Ok(n),
162 Reply::Error(e) => Err(io::Error::other(string(e))),
163 other => Err(unexpected(other)),
164 },
165 }
166 }
167
168 pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
170 match self {
171 Self::Embedded(s) => s.incr_by(key, delta),
172 Self::Remote(c) => {
173 let args = vec![
174 b"INCRBY".to_vec(),
175 key.to_vec(),
176 delta.to_string().into_bytes(),
177 ];
178 match c.request(&args)? {
179 Reply::Int(n) => Ok(n),
180 Reply::Error(e) => Err(io::Error::other(string(e))),
181 other => Err(unexpected(other)),
182 }
183 }
184 }
185 }
186
187 pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
189 match self {
190 Self::Embedded(s) => s.expire(key, ttl),
191 Self::Remote(c) => {
192 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
193 let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
194 match c.request(&args)? {
195 Reply::Int(1) => Ok(true),
196 Reply::Int(0) => Ok(false),
197 Reply::Error(e) => Err(io::Error::other(string(e))),
198 other => Err(unexpected(other)),
199 }
200 }
201 }
202 }
203
204 pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
206 match self {
207 Self::Embedded(s) => s.persist(key),
208 Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
209 Reply::Int(1) => Ok(true),
210 Reply::Int(0) => Ok(false),
211 Reply::Error(e) => Err(io::Error::other(string(e))),
212 other => Err(unexpected(other)),
213 },
214 }
215 }
216
217 pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
219 match self {
220 Self::Embedded(s) => Ok(s.ttl_ms(key)),
221 Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
222 Reply::Int(n) => Ok(n),
223 Reply::Error(e) => Err(io::Error::other(string(e))),
224 other => Err(unexpected(other)),
225 },
226 }
227 }
228
229 pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
233 match self {
234 Self::Embedded(s) => Ok(s.type_of(key).to_string()),
235 Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
236 Reply::Simple(s) => Ok(string(s)),
237 Reply::Error(e) => Err(io::Error::other(string(e))),
238 other => Err(unexpected(other)),
239 },
240 }
241 }
242
243 pub fn dbsize(&mut self) -> io::Result<usize> {
245 match self {
246 Self::Embedded(s) => Ok(s.dbsize()),
247 Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
248 Reply::Int(n) if n >= 0 => Ok(n as usize),
249 Reply::Error(e) => Err(io::Error::other(string(e))),
250 other => Err(unexpected(other)),
251 },
252 }
253 }
254
255 pub fn flush(&mut self) -> io::Result<()> {
258 match self {
259 Self::Embedded(s) => s.flush(),
260 Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
261 Reply::Simple(s) if s == b"OK" => Ok(()),
262 Reply::Error(e) => Err(io::Error::other(string(e))),
263 other => Err(unexpected(other)),
264 },
265 }
266 }
267
268 pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
272 match self {
273 Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
274 Self::Remote(c) => {
275 let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
276 let args = vec![
277 b"SET".to_vec(),
278 key.to_vec(),
279 value.to_vec(),
280 b"PX".to_vec(),
281 ms.to_string().into_bytes(),
282 ];
283 match c.request(&args)? {
284 Reply::Simple(s) if s == b"OK" => Ok(()),
285 Reply::Error(e) => Err(io::Error::other(string(e))),
286 other => Err(unexpected(other)),
287 }
288 }
289 }
290 }
291
292 pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
295 match self {
296 Self::Embedded(s) => keys.iter().map(|k| s.get(k)).collect(),
297 Self::Remote(c) => {
298 let mut args = Vec::with_capacity(keys.len() + 1);
299 args.push(b"MGET".to_vec());
300 args.extend(keys.iter().map(|k| k.to_vec()));
301 match c.request(&args)? {
302 Reply::Array(items) => items
303 .into_iter()
304 .map(|r| match r {
305 Reply::Bulk(v) => Ok(Some(v)),
306 Reply::Nil => Ok(None),
307 other => Err(unexpected(other)),
308 })
309 .collect(),
310 Reply::Error(e) => Err(io::Error::other(string(e))),
311 other => Err(unexpected(other)),
312 }
313 }
314 }
315 }
316
317 pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
319 match self {
320 Self::Embedded(s) => {
321 for (k, v) in pairs {
322 s.set(k, v)?;
323 }
324 Ok(())
325 }
326 Self::Remote(c) => {
327 let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
328 args.push(b"MSET".to_vec());
329 for (k, v) in pairs {
330 args.push(k.to_vec());
331 args.push(v.to_vec());
332 }
333 match c.request(&args)? {
334 Reply::Simple(s) if s == b"OK" => Ok(()),
335 Reply::Error(e) => Err(io::Error::other(string(e))),
336 other => Err(unexpected(other)),
337 }
338 }
339 }
340 }
341
342 pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
356 match self {
357 Self::Embedded(s) => Ok(s.publish(channel, message)),
358 Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
359 Reply::Int(n) if n >= 0 => Ok(n as usize),
360 Reply::Error(e) => Err(io::Error::other(string(e))),
361 other => Err(unexpected(other)),
362 },
363 }
364 }
365}
366
367pub(crate) fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
372 vec![verb.to_vec(), a.to_vec()]
373}
374
375pub(crate) fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
376 vec![verb.to_vec(), a.to_vec(), b.to_vec()]
377}
378
379pub(crate) fn string(b: Vec<u8>) -> String {
380 String::from_utf8_lossy(&b).into_owned()
381}
382
383pub(crate) fn unexpected(r: Reply) -> io::Error {
384 let kind = match r {
385 Reply::Simple(_) => "simple-string",
386 Reply::Error(_) => "error",
387 Reply::Int(_) => "integer",
388 Reply::Bulk(_) => "bulk-string",
389 Reply::Nil | Reply::Null => "nil",
390 Reply::Array(_) => "array",
391 Reply::Map(_) => "map",
392 Reply::Set(_) => "set",
393 Reply::Double(_) => "double",
394 Reply::Boolean(_) => "boolean",
395 Reply::Verbatim { .. } => "verbatim-string",
396 Reply::BigNumber(_) => "big-number",
397 Reply::Push(_) => "push",
398 Reply::BlobError(_) => "blob-error",
399 };
400 io::Error::other(format!("unexpected RESP reply variant: {kind}"))
401}
402
403pub(crate) fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
404 items
405 .into_iter()
406 .map(|r| match r {
407 Reply::Bulk(v) => Ok(v),
408 Reply::Simple(v) => Ok(v),
409 Reply::Nil => Ok(Vec::new()),
410 other => Err(unexpected(other)),
411 })
412 .collect()
413}
414
415pub(crate) fn store_err(e: kevy_embedded::StoreError) -> io::Error {
416 io::Error::other(format!("kevy-store: {e:?}"))
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
427 fn embedded_mem_full_crud_round_trip() {
428 let mut c = Connection::open("mem://").unwrap();
429 c.ping().unwrap();
430
431 c.set(b"k", b"v").unwrap();
432 assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
433
434 assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
435 assert_eq!(c.get(b"k").unwrap(), None);
436
437 c.set(b"a", b"1").unwrap();
438 c.set(b"b", b"2").unwrap();
439 assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
440
441 assert_eq!(c.incr(b"counter").unwrap(), 1);
442 assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
443
444 c.set(b"timed", b"x").unwrap();
445 assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
446 let ttl = c.ttl_ms(b"timed").unwrap();
447 assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
448 assert!(c.persist(b"timed").unwrap());
449 assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
450
451 assert_eq!(c.type_of(b"none").unwrap(), "none");
452 assert_eq!(c.type_of(b"timed").unwrap(), "string");
453
454 assert!(c.dbsize().unwrap() >= 3);
455 c.flush().unwrap();
456 assert_eq!(c.dbsize().unwrap(), 0);
457
458 c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60))
459 .unwrap();
460 let ttl = c.ttl_ms(b"timed2").unwrap();
461 assert!((0..=60_000).contains(&ttl));
462 }
463
464 #[test]
465 fn anonymous_mem_publish_returns_zero() {
466 let mut c = Connection::open("mem://").unwrap();
468 assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
469 }
470
471 #[test]
472 fn embedded_mget_mset() {
473 let mut c = Connection::open("mem://").unwrap();
474 c.mset(&[
475 (b"a".as_ref(), b"1".as_ref()),
476 (b"b".as_ref(), b"2".as_ref()),
477 ])
478 .unwrap();
479 let got = c.mget(&[&b"a"[..], &b"b"[..], &b"missing"[..]]).unwrap();
480 assert_eq!(
481 got,
482 vec![Some(b"1".to_vec()), Some(b"2".to_vec()), None]
483 );
484 }
485
486 #[test]
487 fn embedded_multi_rejected_unsupported() {
488 let mut c = Connection::open("mem://").unwrap();
489 let err = c.multi().unwrap_err();
490 assert_eq!(err.kind(), io::ErrorKind::Unsupported);
491 }
492}