Skip to main content

kevy_resp_client/
lib.rs

1//! Blocking RESP2 client over `TcpStream`.
2//!
3//! [`RespClient::connect`] opens a TCP connection (with `TCP_NODELAY`);
4//! [`RespClient::request`] writes one command and blocks until exactly one
5//! reply is parsed. Works against any RESP2 server — kevy, valkey, redis.
6//!
7//! [`RespClient::from_url`] is the URL-string entry point and accepts
8//! `kevy://` (kevy-native alias), `redis://` (standard), and `tcp://`
9//! (plain host:port — no leading SELECT round-trip):
10//!
11//! ```no_run
12//! # use kevy_resp_client::RespClient;
13//! let _ = RespClient::from_url("kevy://localhost:6379")?;     // alias of redis://
14//! let _ = RespClient::from_url("kevy://localhost:6379/0")?;   // also issues SELECT 0
15//! let _ = RespClient::from_url("redis://10.0.0.5:6379")?;
16//! let _ = RespClient::from_url("tcp://kevy.internal:6379")?;
17//! # Ok::<(), std::io::Error>(())
18//! ```
19//!
20//! Single-threaded; one client per thread. Holds an incremental read buffer
21//! so multi-segment replies reassemble across `read` calls.
22//!
23//! Pure Rust, only deps are `std` + [`kevy_resp`].
24//!
25//! # Example
26//!
27//! ```no_run
28//! use kevy_resp_client::RespClient;
29//!
30//! let mut c = RespClient::connect("127.0.0.1", 6379)?;
31//! let reply = c.request(&[b"PING".to_vec()])?;
32//! println!("{reply:?}");
33//! # Ok::<(), std::io::Error>(())
34//! ```
35
36#![forbid(unsafe_code)]
37#![warn(missing_docs)]
38
39pub use kevy_resp::Reply;
40use kevy_resp::{encode_command, parse_reply};
41use std::io::{self, Read, Write};
42use std::net::TcpStream;
43
44/// A blocking RESP2 connection over `TcpStream`.
45///
46/// Holds the stream plus an incremental read buffer so multi-segment replies
47/// reassemble across `read` calls. Not `Sync`; one client per thread.
48pub struct RespClient {
49    stream: TcpStream,
50    buf: Vec<u8>,
51}
52
53impl RespClient {
54    /// Connect to `host:port`, enabling `TCP_NODELAY` (best-effort).
55    pub fn connect(host: &str, port: u16) -> io::Result<Self> {
56        let stream = TcpStream::connect((host, port))?;
57        stream.set_nodelay(true).ok();
58        Ok(Self {
59            stream,
60            buf: Vec::with_capacity(8192),
61        })
62    }
63
64    /// Send one command (`args` is RESP-encoded as a multibulk array) and
65    /// block until exactly one reply is parsed. Returns the parsed [`Reply`].
66    pub fn request(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
67        let mut out = Vec::new();
68        encode_command(&mut out, args);
69        self.stream.write_all(&out)?;
70
71        let mut chunk = [0u8; 8192];
72        loop {
73            match parse_reply(&self.buf) {
74                Ok(Some((reply, used))) => {
75                    self.buf.drain(..used);
76                    return Ok(reply);
77                }
78                Ok(None) => {}
79                Err(_) => {
80                    return Err(io::Error::new(
81                        io::ErrorKind::InvalidData,
82                        "malformed reply",
83                    ));
84                }
85            }
86            let n = self.stream.read(&mut chunk)?;
87            if n == 0 {
88                return Err(io::Error::new(
89                    io::ErrorKind::UnexpectedEof,
90                    "server closed connection",
91                ));
92            }
93            self.buf.extend_from_slice(&chunk[..n]);
94        }
95    }
96
97    /// Connect from a URL string.
98    ///
99    /// Accepted schemes (all wire-protocol identical — RESP2 over TCP):
100    /// - `kevy://host[:port][/db]` — kevy-native alias of `redis://`.
101    /// - `redis://host[:port][/db]` — standard Redis URL (every official
102    ///   client lib speaks this).
103    /// - `tcp://host[:port]` — plain TCP with no leading SELECT round-trip.
104    ///
105    /// Auth and TLS schemes (`redis://user:pass@…`, `rediss://`) are NOT
106    /// supported — kevy itself ships without AUTH/TLS. Including a userinfo
107    /// component or using `rediss://` returns [`io::ErrorKind::Unsupported`].
108    ///
109    /// If a `/db` path segment is present, an explicit `SELECT <db>` is
110    /// issued before returning the client. For non-zero indices kevy will
111    /// reply with its "only supports DB 0" error and `from_url` propagates
112    /// that as [`io::ErrorKind::Other`].
113    pub fn from_url(url: &str) -> io::Result<Self> {
114        let parsed = parse_url(url)?;
115        let mut client = Self::connect(&parsed.host, parsed.port)?;
116        if let Some(db) = parsed.db {
117            let reply = client.request(&[b"SELECT".to_vec(), db.to_string().into_bytes()])?;
118            if let Reply::Error(msg) = reply {
119                let text = String::from_utf8_lossy(&msg);
120                return Err(io::Error::other(format!("SELECT {db} rejected: {text}")));
121            }
122        }
123        Ok(client)
124    }
125}
126
127/// Parsed URL pieces. Tiny — full url-rs would be a crates.io dep, against
128/// the 0-dep charter. We only need scheme / host / port / db.
129#[derive(Debug, PartialEq, Eq)]
130struct ParsedUrl {
131    host: String,
132    port: u16,
133    db: Option<u32>,
134}
135
136fn parse_url(url: &str) -> io::Result<ParsedUrl> {
137    let (scheme, rest) = split_scheme(url)?;
138    if rest.contains('@') {
139        return Err(io::Error::new(
140            io::ErrorKind::Unsupported,
141            "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
142        ));
143    }
144    let (authority, path) = match rest.split_once('/') {
145        Some((auth, p)) => (auth, Some(p)),
146        None => (rest, None),
147    };
148    let (host, port) = parse_authority(authority)?;
149    let db = parse_db_path(scheme, path)?;
150    Ok(ParsedUrl { host, port, db })
151}
152
153/// Validate the URL scheme and return `(scheme, rest)` where `rest` is
154/// everything past `://`. Rejects TLS schemes (kevy has no TLS) and
155/// unknown schemes.
156fn split_scheme(url: &str) -> io::Result<(&str, &str)> {
157    let (scheme, rest) = url
158        .split_once("://")
159        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'"))?;
160    match scheme {
161        "kevy" | "redis" | "tcp" => Ok((scheme, rest)),
162        "rediss" | "kevys" => Err(io::Error::new(
163            io::ErrorKind::Unsupported,
164            "TLS schemes (rediss://, kevys://) are unsupported — kevy has no TLS",
165        )),
166        other => Err(io::Error::new(
167            io::ErrorKind::InvalidInput,
168            format!("unknown URL scheme '{other}://'"),
169        )),
170    }
171}
172
173/// Parse `host[:port]` — defaulting to port 6379 (Redis convention)
174/// when the colon is absent. Empty hosts are rejected.
175fn parse_authority(authority: &str) -> io::Result<(String, u16)> {
176    let (host, port) = match authority.rsplit_once(':') {
177        Some((h, p)) => {
178            let port: u16 = p.parse().map_err(|_| {
179                io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
180            })?;
181            (h.to_string(), port)
182        }
183        None => (authority.to_string(), 6379),
184    };
185    if host.is_empty() {
186        return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
187    }
188    Ok((host, port))
189}
190
191/// Optional DB index from the path component. `tcp://` is a raw-socket
192/// URL and rejects any path; `kevy://` and `redis://` honour `/N`.
193fn parse_db_path(scheme: &str, path: Option<&str>) -> io::Result<Option<u32>> {
194    match path {
195        None | Some("") => Ok(None),
196        Some(p) if scheme == "tcp" => Err(io::Error::new(
197            io::ErrorKind::InvalidInput,
198            format!("tcp:// URL must not have a path: '/{p}'"),
199        )),
200        Some(p) => {
201            let n: u32 = p.parse().map_err(|_| {
202                io::Error::new(
203                    io::ErrorKind::InvalidInput,
204                    format!("bad db index: '{p}' (expected a non-negative integer)"),
205                )
206            })?;
207            Ok(Some(n))
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    fn parse(u: &str) -> ParsedUrl {
217        parse_url(u).unwrap_or_else(|e| panic!("{u}: {e}"))
218    }
219
220    #[test]
221    fn kevy_redis_tcp_schemes_all_resolve() {
222        for url in [
223            "kevy://localhost:6379",
224            "redis://localhost:6379",
225            "tcp://localhost:6379",
226        ] {
227            let p = parse(url);
228            assert_eq!(p.host, "localhost");
229            assert_eq!(p.port, 6379);
230            assert_eq!(p.db, None);
231        }
232    }
233
234    #[test]
235    fn default_port_is_6379_when_omitted() {
236        let p = parse("kevy://example.com");
237        assert_eq!(p.host, "example.com");
238        assert_eq!(p.port, 6379);
239    }
240
241    #[test]
242    fn db_path_segment_parsed() {
243        assert_eq!(parse("kevy://h:1/0").db, Some(0));
244        assert_eq!(parse("redis://h:1/3").db, Some(3));
245        assert_eq!(parse("kevy://h").db, None);
246        assert_eq!(parse("kevy://h/").db, None);
247    }
248
249    #[test]
250    fn tls_schemes_rejected() {
251        let err = parse_url("rediss://h:6379").unwrap_err();
252        assert_eq!(err.kind(), io::ErrorKind::Unsupported);
253        let err = parse_url("kevys://h:6379").unwrap_err();
254        assert_eq!(err.kind(), io::ErrorKind::Unsupported);
255    }
256
257    #[test]
258    fn auth_userinfo_rejected() {
259        let err = parse_url("kevy://user:pass@h:6379").unwrap_err();
260        assert_eq!(err.kind(), io::ErrorKind::Unsupported);
261    }
262
263    #[test]
264    fn unknown_scheme_rejected() {
265        let err = parse_url("memcached://h:11211").unwrap_err();
266        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
267    }
268
269    #[test]
270    fn missing_scheme_rejected() {
271        assert!(parse_url("localhost:6379").is_err());
272    }
273
274    #[test]
275    fn tcp_with_path_rejected() {
276        // tcp:// is the raw form — db indices only make sense with the
277        // redis/kevy semantic schemes.
278        assert!(parse_url("tcp://h:6379/0").is_err());
279    }
280
281    #[test]
282    fn bad_port_rejected() {
283        assert!(parse_url("kevy://h:notaport").is_err());
284        assert!(parse_url("kevy://h:99999").is_err()); // > u16::MAX
285    }
286
287    #[test]
288    fn bad_db_rejected() {
289        assert!(parse_url("kevy://h/abc").is_err());
290        assert!(parse_url("kevy://h/-1").is_err());
291    }
292
293    #[test]
294    fn empty_host_rejected() {
295        assert!(parse_url("kevy://:6379").is_err());
296    }
297}