1#![forbid(unsafe_code)]
37#![warn(missing_docs)]
38
39pub use kevy_resp::Reply;
40use kevy_resp::{encode_command, parse_reply};
41use std::io::{self, Read, Write};
42use std::net::TcpStream;
43
44pub struct RespClient {
49 stream: TcpStream,
50 buf: Vec<u8>,
51}
52
53impl RespClient {
54 pub fn connect(host: &str, port: u16) -> io::Result<Self> {
56 let stream = TcpStream::connect((host, port))?;
57 stream.set_nodelay(true).ok();
58 Ok(Self {
59 stream,
60 buf: Vec::with_capacity(8192),
61 })
62 }
63
64 pub fn request(&mut self, args: &[Vec<u8>]) -> io::Result<Reply> {
67 let mut out = Vec::new();
68 encode_command(&mut out, args);
69 self.stream.write_all(&out)?;
70
71 let mut chunk = [0u8; 8192];
72 loop {
73 match parse_reply(&self.buf) {
74 Ok(Some((reply, used))) => {
75 self.buf.drain(..used);
76 return Ok(reply);
77 }
78 Ok(None) => {}
79 Err(_) => {
80 return Err(io::Error::new(
81 io::ErrorKind::InvalidData,
82 "malformed reply",
83 ));
84 }
85 }
86 let n = self.stream.read(&mut chunk)?;
87 if n == 0 {
88 return Err(io::Error::new(
89 io::ErrorKind::UnexpectedEof,
90 "server closed connection",
91 ));
92 }
93 self.buf.extend_from_slice(&chunk[..n]);
94 }
95 }
96
97 pub fn from_url(url: &str) -> io::Result<Self> {
114 let parsed = parse_url(url)?;
115 let mut client = Self::connect(&parsed.host, parsed.port)?;
116 if let Some(db) = parsed.db {
117 let reply = client.request(&[b"SELECT".to_vec(), db.to_string().into_bytes()])?;
118 if let Reply::Error(msg) = reply {
119 let text = String::from_utf8_lossy(&msg);
120 return Err(io::Error::other(format!("SELECT {db} rejected: {text}")));
121 }
122 }
123 Ok(client)
124 }
125}
126
127#[derive(Debug, PartialEq, Eq)]
130struct ParsedUrl {
131 host: String,
132 port: u16,
133 db: Option<u32>,
134}
135
136fn parse_url(url: &str) -> io::Result<ParsedUrl> {
137 let (scheme, rest) = split_scheme(url)?;
138 if rest.contains('@') {
139 return Err(io::Error::new(
140 io::ErrorKind::Unsupported,
141 "userinfo (user:pass@host) is unsupported — kevy has no AUTH",
142 ));
143 }
144 let (authority, path) = match rest.split_once('/') {
145 Some((auth, p)) => (auth, Some(p)),
146 None => (rest, None),
147 };
148 let (host, port) = parse_authority(authority)?;
149 let db = parse_db_path(scheme, path)?;
150 Ok(ParsedUrl { host, port, db })
151}
152
153fn split_scheme(url: &str) -> io::Result<(&str, &str)> {
157 let (scheme, rest) = url
158 .split_once("://")
159 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "URL missing '://'"))?;
160 match scheme {
161 "kevy" | "redis" | "tcp" => Ok((scheme, rest)),
162 "rediss" | "kevys" => Err(io::Error::new(
163 io::ErrorKind::Unsupported,
164 "TLS schemes (rediss://, kevys://) are unsupported — kevy has no TLS",
165 )),
166 other => Err(io::Error::new(
167 io::ErrorKind::InvalidInput,
168 format!("unknown URL scheme '{other}://'"),
169 )),
170 }
171}
172
173fn parse_authority(authority: &str) -> io::Result<(String, u16)> {
176 let (host, port) = match authority.rsplit_once(':') {
177 Some((h, p)) => {
178 let port: u16 = p.parse().map_err(|_| {
179 io::Error::new(io::ErrorKind::InvalidInput, format!("bad port: {p}"))
180 })?;
181 (h.to_string(), port)
182 }
183 None => (authority.to_string(), 6379),
184 };
185 if host.is_empty() {
186 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty host"));
187 }
188 Ok((host, port))
189}
190
191fn parse_db_path(scheme: &str, path: Option<&str>) -> io::Result<Option<u32>> {
194 match path {
195 None | Some("") => Ok(None),
196 Some(p) if scheme == "tcp" => Err(io::Error::new(
197 io::ErrorKind::InvalidInput,
198 format!("tcp:// URL must not have a path: '/{p}'"),
199 )),
200 Some(p) => {
201 let n: u32 = p.parse().map_err(|_| {
202 io::Error::new(
203 io::ErrorKind::InvalidInput,
204 format!("bad db index: '{p}' (expected a non-negative integer)"),
205 )
206 })?;
207 Ok(Some(n))
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 fn parse(u: &str) -> ParsedUrl {
217 parse_url(u).unwrap_or_else(|e| panic!("{u}: {e}"))
218 }
219
220 #[test]
221 fn kevy_redis_tcp_schemes_all_resolve() {
222 for url in [
223 "kevy://localhost:6379",
224 "redis://localhost:6379",
225 "tcp://localhost:6379",
226 ] {
227 let p = parse(url);
228 assert_eq!(p.host, "localhost");
229 assert_eq!(p.port, 6379);
230 assert_eq!(p.db, None);
231 }
232 }
233
234 #[test]
235 fn default_port_is_6379_when_omitted() {
236 let p = parse("kevy://example.com");
237 assert_eq!(p.host, "example.com");
238 assert_eq!(p.port, 6379);
239 }
240
241 #[test]
242 fn db_path_segment_parsed() {
243 assert_eq!(parse("kevy://h:1/0").db, Some(0));
244 assert_eq!(parse("redis://h:1/3").db, Some(3));
245 assert_eq!(parse("kevy://h").db, None);
246 assert_eq!(parse("kevy://h/").db, None);
247 }
248
249 #[test]
250 fn tls_schemes_rejected() {
251 let err = parse_url("rediss://h:6379").unwrap_err();
252 assert_eq!(err.kind(), io::ErrorKind::Unsupported);
253 let err = parse_url("kevys://h:6379").unwrap_err();
254 assert_eq!(err.kind(), io::ErrorKind::Unsupported);
255 }
256
257 #[test]
258 fn auth_userinfo_rejected() {
259 let err = parse_url("kevy://user:pass@h:6379").unwrap_err();
260 assert_eq!(err.kind(), io::ErrorKind::Unsupported);
261 }
262
263 #[test]
264 fn unknown_scheme_rejected() {
265 let err = parse_url("memcached://h:11211").unwrap_err();
266 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
267 }
268
269 #[test]
270 fn missing_scheme_rejected() {
271 assert!(parse_url("localhost:6379").is_err());
272 }
273
274 #[test]
275 fn tcp_with_path_rejected() {
276 assert!(parse_url("tcp://h:6379/0").is_err());
279 }
280
281 #[test]
282 fn bad_port_rejected() {
283 assert!(parse_url("kevy://h:notaport").is_err());
284 assert!(parse_url("kevy://h:99999").is_err()); }
286
287 #[test]
288 fn bad_db_rejected() {
289 assert!(parse_url("kevy://h/abc").is_err());
290 assert!(parse_url("kevy://h/-1").is_err());
291 }
292
293 #[test]
294 fn empty_host_rejected() {
295 assert!(parse_url("kevy://:6379").is_err());
296 }
297}