Skip to main content

pylon_runtime/
resp_server.rs

1//! RESP-compatible TCP server for the pylon cache.
2//!
3//! Speaks the Redis wire protocol (RESP2), so any `redis-cli` or Redis client
4//! library can talk directly to the pylon cache without HTTP overhead.
5//!
6//! # Supported commands
7//!
8//! Strings: GET, SET, DEL, EXISTS, INCR, DECR, INCRBY, SETNX, GETSET, MGET, MSET
9//! TTL:     EXPIRE, PERSIST, TTL
10//! Lists:   LPUSH, RPUSH, LPOP, RPOP, LRANGE, LLEN
11//! Sets:    SADD, SREM, SMEMBERS, SISMEMBER, SCARD, SINTER, SUNION
12//! Hashes:  HSET, HGET, HDEL, HGETALL, HEXISTS, HLEN, HKEYS, HINCRBY
13//! Sorted:  ZADD, ZREM, ZSCORE, ZRANK, ZRANGE, ZCARD
14//! Keys:    KEYS, TYPE, DBSIZE, FLUSHALL, FLUSHDB
15//! Conn:    PING, ECHO, QUIT, COMMAND, INFO
16
17use std::io::{BufReader, Write};
18use std::net::{TcpListener, TcpStream};
19use std::sync::Arc;
20use std::thread;
21
22use pylon_plugin::builtin::cache::CachePlugin;
23
24use crate::resp::{parse_resp, RespValue};
25
26/// Start a RESP-compatible server (Redis protocol) on the given port.
27///
28/// This blocks the calling thread. Each client connection is handled in its
29/// own thread with a synchronous read loop.
30pub fn start_resp_server(cache: Arc<CachePlugin>, port: u16) {
31    let addr = format!("0.0.0.0:{port}");
32    let listener = match TcpListener::bind(&addr) {
33        Ok(l) => l,
34        Err(e) => {
35            tracing::warn!("[resp] Failed to bind RESP server on {addr}: {e}");
36            return;
37        }
38    };
39
40    tracing::warn!("[resp] RESP server listening on resp://localhost:{port}");
41    tracing::warn!("[resp] Compatible with redis-cli: redis-cli -p {port}");
42
43    for stream in listener.incoming() {
44        let stream = match stream {
45            Ok(s) => s,
46            Err(_) => continue,
47        };
48
49        let cache = Arc::clone(&cache);
50        thread::spawn(move || {
51            handle_client(cache, stream);
52        });
53    }
54}
55
56fn handle_client(cache: Arc<CachePlugin>, stream: TcpStream) {
57    let write_stream = match stream.try_clone() {
58        Ok(s) => s,
59        Err(_) => return,
60    };
61    let mut reader = BufReader::new(stream);
62    let mut writer = write_stream;
63
64    loop {
65        let value = match parse_resp(&mut reader) {
66            Ok(v) => v,
67            Err(_) => break, // Client disconnected or protocol error
68        };
69
70        // Commands arrive as arrays: ["SET", "key", "value"]
71        let args = match value {
72            RespValue::Array(Some(items)) => items,
73            _ => {
74                let _ = writer.write_all(&RespValue::err("Expected array command").serialize());
75                continue;
76            }
77        };
78
79        let cmd_parts: Vec<String> = args
80            .iter()
81            .filter_map(|v| match v {
82                RespValue::BulkString(Some(s)) => Some(s.clone()),
83                RespValue::SimpleString(s) => Some(s.clone()),
84                _ => None,
85            })
86            .collect();
87
88        if cmd_parts.is_empty() {
89            let _ = writer.write_all(&RespValue::err("Empty command").serialize());
90            continue;
91        }
92
93        let response = execute_command(&cache, &cmd_parts);
94        let _ = writer.write_all(&response.serialize());
95        let _ = writer.flush();
96
97        // QUIT: send OK then close.
98        if cmd_parts[0].eq_ignore_ascii_case("QUIT") {
99            break;
100        }
101    }
102}
103
104/// Execute a Redis command against the CachePlugin.
105fn execute_command(cache: &CachePlugin, args: &[String]) -> RespValue {
106    let cmd = args[0].to_uppercase();
107
108    match cmd.as_str() {
109        // -----------------------------------------------------------------
110        // Connection
111        // -----------------------------------------------------------------
112        "PING" => {
113            if args.len() > 1 {
114                RespValue::bulk(&args[1])
115            } else {
116                RespValue::SimpleString("PONG".into())
117            }
118        }
119        "ECHO" => {
120            if args.len() < 2 {
121                return RespValue::err("wrong number of arguments for 'echo' command");
122            }
123            RespValue::bulk(&args[1])
124        }
125        "QUIT" => RespValue::ok(),
126        "COMMAND" => RespValue::ok(), // redis-cli sends this on connect
127
128        // -----------------------------------------------------------------
129        // Strings
130        // -----------------------------------------------------------------
131        "SET" => {
132            if args.len() < 3 {
133                return RespValue::err("wrong number of arguments for 'set' command");
134            }
135
136            // Parse optional flags: EX seconds, PX milliseconds, NX, XX.
137            let mut ttl: Option<u64> = None;
138            let mut nx = false;
139            let mut xx = false;
140            let mut i = 3;
141            while i < args.len() {
142                match args[i].to_uppercase().as_str() {
143                    "EX" => {
144                        i += 1;
145                        ttl = args.get(i).and_then(|v| v.parse::<u64>().ok());
146                    }
147                    "PX" => {
148                        i += 1;
149                        ttl = args.get(i).and_then(|v| v.parse::<u64>().ok()).map(|ms| {
150                            // Convert ms to seconds, rounding up so sub-second TTLs
151                            // still expire rather than becoming zero (infinite).
152                            if ms == 0 {
153                                0
154                            } else {
155                                (ms + 999) / 1000
156                            }
157                        });
158                    }
159                    "NX" => nx = true,
160                    "XX" => xx = true,
161                    _ => {}
162                }
163                i += 1;
164            }
165
166            if nx {
167                if cache.setnx(&args[1], &args[2], ttl) {
168                    RespValue::ok()
169                } else {
170                    RespValue::null()
171                }
172            } else if xx {
173                if cache.exists(&args[1]) {
174                    cache.set(&args[1], &args[2], ttl);
175                    RespValue::ok()
176                } else {
177                    RespValue::null()
178                }
179            } else {
180                cache.set(&args[1], &args[2], ttl);
181                RespValue::ok()
182            }
183        }
184        "GET" => {
185            if args.len() < 2 {
186                return RespValue::err("wrong number of arguments for 'get' command");
187            }
188            match cache.get(&args[1]) {
189                Some(v) => RespValue::bulk(&v),
190                None => RespValue::null(),
191            }
192        }
193        "DEL" => {
194            if args.len() < 2 {
195                return RespValue::err("wrong number of arguments for 'del' command");
196            }
197            let mut count = 0i64;
198            for key in &args[1..] {
199                if cache.del(key) {
200                    count += 1;
201                }
202            }
203            RespValue::int(count)
204        }
205        "EXISTS" => {
206            if args.len() < 2 {
207                return RespValue::err("wrong number of arguments for 'exists' command");
208            }
209            let mut count = 0i64;
210            for key in &args[1..] {
211                if cache.exists(key) {
212                    count += 1;
213                }
214            }
215            RespValue::int(count)
216        }
217        "INCR" => {
218            if args.len() < 2 {
219                return RespValue::err("wrong number of arguments for 'incr' command");
220            }
221            match cache.incr(&args[1]) {
222                Ok(n) => RespValue::int(n),
223                Err(e) => RespValue::err(&e),
224            }
225        }
226        "DECR" => {
227            if args.len() < 2 {
228                return RespValue::err("wrong number of arguments for 'decr' command");
229            }
230            match cache.decr(&args[1]) {
231                Ok(n) => RespValue::int(n),
232                Err(e) => RespValue::err(&e),
233            }
234        }
235        "INCRBY" => {
236            if args.len() < 3 {
237                return RespValue::err("wrong number of arguments for 'incrby' command");
238            }
239            let amount: i64 = match args[2].parse() {
240                Ok(n) => n,
241                Err(_) => return RespValue::err("value is not an integer or out of range"),
242            };
243            match cache.incrby(&args[1], amount) {
244                Ok(n) => RespValue::int(n),
245                Err(e) => RespValue::err(&e),
246            }
247        }
248        "SETNX" => {
249            if args.len() < 3 {
250                return RespValue::err("wrong number of arguments for 'setnx' command");
251            }
252            let set = cache.setnx(&args[1], &args[2], None);
253            RespValue::int(if set { 1 } else { 0 })
254        }
255        "GETSET" => {
256            if args.len() < 3 {
257                return RespValue::err("wrong number of arguments for 'getset' command");
258            }
259            match cache.getset(&args[1], &args[2]) {
260                Some(v) => RespValue::bulk(&v),
261                None => RespValue::null(),
262            }
263        }
264        "MGET" => {
265            if args.len() < 2 {
266                return RespValue::err("wrong number of arguments for 'mget' command");
267            }
268            let keys: Vec<&str> = args[1..].iter().map(|s| s.as_str()).collect();
269            let values = cache.mget(&keys);
270            RespValue::array(
271                values
272                    .into_iter()
273                    .map(|v| match v {
274                        Some(s) => RespValue::bulk(&s),
275                        None => RespValue::null(),
276                    })
277                    .collect(),
278            )
279        }
280        "MSET" => {
281            if args.len() < 3 || (args.len() - 1) % 2 != 0 {
282                return RespValue::err("wrong number of arguments for 'mset' command");
283            }
284            let mut pairs = Vec::new();
285            let mut i = 1;
286            while i < args.len() - 1 {
287                pairs.push((args[i].as_str(), args[i + 1].as_str()));
288                i += 2;
289            }
290            cache.mset(&pairs);
291            RespValue::ok()
292        }
293
294        // -----------------------------------------------------------------
295        // TTL
296        // -----------------------------------------------------------------
297        "EXPIRE" => {
298            if args.len() < 3 {
299                return RespValue::err("wrong number of arguments for 'expire' command");
300            }
301            let secs: u64 = match args[2].parse() {
302                Ok(n) => n,
303                Err(_) => return RespValue::err("value is not an integer or out of range"),
304            };
305            RespValue::int(if cache.expire(&args[1], secs) { 1 } else { 0 })
306        }
307        "PERSIST" => {
308            if args.len() < 2 {
309                return RespValue::err("wrong number of arguments for 'persist' command");
310            }
311            RespValue::int(if cache.persist(&args[1]) { 1 } else { 0 })
312        }
313        "TTL" => {
314            if args.len() < 2 {
315                return RespValue::err("wrong number of arguments for 'ttl' command");
316            }
317            RespValue::int(cache.ttl(&args[1]))
318        }
319
320        // -----------------------------------------------------------------
321        // Lists
322        // -----------------------------------------------------------------
323        "LPUSH" => {
324            if args.len() < 3 {
325                return RespValue::err("wrong number of arguments for 'lpush' command");
326            }
327            let mut len = 0;
328            for val in &args[2..] {
329                len = cache.lpush(&args[1], val);
330            }
331            RespValue::int(len as i64)
332        }
333        "RPUSH" => {
334            if args.len() < 3 {
335                return RespValue::err("wrong number of arguments for 'rpush' command");
336            }
337            let mut len = 0;
338            for val in &args[2..] {
339                len = cache.rpush(&args[1], val);
340            }
341            RespValue::int(len as i64)
342        }
343        "LPOP" => {
344            if args.len() < 2 {
345                return RespValue::err("wrong number of arguments for 'lpop' command");
346            }
347            match cache.lpop(&args[1]) {
348                Some(v) => RespValue::bulk(&v),
349                None => RespValue::null(),
350            }
351        }
352        "RPOP" => {
353            if args.len() < 2 {
354                return RespValue::err("wrong number of arguments for 'rpop' command");
355            }
356            match cache.rpop(&args[1]) {
357                Some(v) => RespValue::bulk(&v),
358                None => RespValue::null(),
359            }
360        }
361        "LRANGE" => {
362            if args.len() < 4 {
363                return RespValue::err("wrong number of arguments for 'lrange' command");
364            }
365            let start: i64 = args[2].parse().unwrap_or(0);
366            let stop: i64 = args[3].parse().unwrap_or(-1);
367            let items = cache.lrange(&args[1], start, stop);
368            RespValue::array(items.into_iter().map(|s| RespValue::bulk(&s)).collect())
369        }
370        "LLEN" => {
371            if args.len() < 2 {
372                return RespValue::err("wrong number of arguments for 'llen' command");
373            }
374            RespValue::int(cache.llen(&args[1]) as i64)
375        }
376
377        // -----------------------------------------------------------------
378        // Sets
379        // -----------------------------------------------------------------
380        "SADD" => {
381            if args.len() < 3 {
382                return RespValue::err("wrong number of arguments for 'sadd' command");
383            }
384            let mut added = 0i64;
385            for member in &args[2..] {
386                if cache.sadd(&args[1], member) {
387                    added += 1;
388                }
389            }
390            RespValue::int(added)
391        }
392        "SREM" => {
393            if args.len() < 3 {
394                return RespValue::err("wrong number of arguments for 'srem' command");
395            }
396            let mut removed = 0i64;
397            for member in &args[2..] {
398                if cache.srem(&args[1], member) {
399                    removed += 1;
400                }
401            }
402            RespValue::int(removed)
403        }
404        "SMEMBERS" => {
405            if args.len() < 2 {
406                return RespValue::err("wrong number of arguments for 'smembers' command");
407            }
408            let members = cache.smembers(&args[1]);
409            RespValue::array(members.into_iter().map(|s| RespValue::bulk(&s)).collect())
410        }
411        "SISMEMBER" => {
412            if args.len() < 3 {
413                return RespValue::err("wrong number of arguments for 'sismember' command");
414            }
415            RespValue::int(if cache.sismember(&args[1], &args[2]) {
416                1
417            } else {
418                0
419            })
420        }
421        "SCARD" => {
422            if args.len() < 2 {
423                return RespValue::err("wrong number of arguments for 'scard' command");
424            }
425            RespValue::int(cache.scard(&args[1]) as i64)
426        }
427        "SINTER" => {
428            if args.len() < 3 {
429                return RespValue::err("wrong number of arguments for 'sinter' command");
430            }
431            let result = cache.sinter(&args[1], &args[2]);
432            RespValue::array(result.into_iter().map(|s| RespValue::bulk(&s)).collect())
433        }
434        "SUNION" => {
435            if args.len() < 3 {
436                return RespValue::err("wrong number of arguments for 'sunion' command");
437            }
438            let result = cache.sunion(&args[1], &args[2]);
439            RespValue::array(result.into_iter().map(|s| RespValue::bulk(&s)).collect())
440        }
441
442        // -----------------------------------------------------------------
443        // Hashes
444        // -----------------------------------------------------------------
445        "HSET" => {
446            if args.len() < 4 || (args.len() - 2) % 2 != 0 {
447                return RespValue::err("wrong number of arguments for 'hset' command");
448            }
449            let mut count = 0i64;
450            let mut i = 2;
451            while i < args.len() - 1 {
452                cache.hset(&args[1], &args[i], &args[i + 1]);
453                count += 1;
454                i += 2;
455            }
456            RespValue::int(count)
457        }
458        "HGET" => {
459            if args.len() < 3 {
460                return RespValue::err("wrong number of arguments for 'hget' command");
461            }
462            match cache.hget(&args[1], &args[2]) {
463                Some(v) => RespValue::bulk(&v),
464                None => RespValue::null(),
465            }
466        }
467        "HDEL" => {
468            if args.len() < 3 {
469                return RespValue::err("wrong number of arguments for 'hdel' command");
470            }
471            let mut count = 0i64;
472            for field in &args[2..] {
473                if cache.hdel(&args[1], field) {
474                    count += 1;
475                }
476            }
477            RespValue::int(count)
478        }
479        "HGETALL" => {
480            if args.len() < 2 {
481                return RespValue::err("wrong number of arguments for 'hgetall' command");
482            }
483            let map = cache.hgetall(&args[1]);
484            let mut items = Vec::with_capacity(map.len() * 2);
485            for (k, v) in &map {
486                items.push(RespValue::bulk(k));
487                items.push(RespValue::bulk(v));
488            }
489            RespValue::array(items)
490        }
491        "HEXISTS" => {
492            if args.len() < 3 {
493                return RespValue::err("wrong number of arguments for 'hexists' command");
494            }
495            RespValue::int(if cache.hexists(&args[1], &args[2]) {
496                1
497            } else {
498                0
499            })
500        }
501        "HLEN" => {
502            if args.len() < 2 {
503                return RespValue::err("wrong number of arguments for 'hlen' command");
504            }
505            RespValue::int(cache.hlen(&args[1]) as i64)
506        }
507        "HKEYS" => {
508            if args.len() < 2 {
509                return RespValue::err("wrong number of arguments for 'hkeys' command");
510            }
511            let keys = cache.hkeys(&args[1]);
512            RespValue::array(keys.into_iter().map(|s| RespValue::bulk(&s)).collect())
513        }
514        "HINCRBY" => {
515            if args.len() < 4 {
516                return RespValue::err("wrong number of arguments for 'hincrby' command");
517            }
518            let amount: i64 = match args[3].parse() {
519                Ok(n) => n,
520                Err(_) => return RespValue::err("value is not an integer or out of range"),
521            };
522            match cache.hincrby(&args[1], &args[2], amount) {
523                Ok(n) => RespValue::int(n),
524                Err(e) => RespValue::err(&e),
525            }
526        }
527
528        // -----------------------------------------------------------------
529        // Sorted sets
530        // -----------------------------------------------------------------
531        "ZADD" => {
532            if args.len() < 4 || (args.len() - 2) % 2 != 0 {
533                return RespValue::err("wrong number of arguments for 'zadd' command");
534            }
535            let mut count = 0i64;
536            let mut i = 2;
537            while i < args.len() - 1 {
538                let score: f64 = match args[i].parse() {
539                    Ok(n) => n,
540                    Err(_) => return RespValue::err("value is not a valid float"),
541                };
542                cache.zadd(&args[1], score, &args[i + 1]);
543                count += 1;
544                i += 2;
545            }
546            RespValue::int(count)
547        }
548        "ZREM" => {
549            if args.len() < 3 {
550                return RespValue::err("wrong number of arguments for 'zrem' command");
551            }
552            let mut count = 0i64;
553            for member in &args[2..] {
554                if cache.zrem(&args[1], member) {
555                    count += 1;
556                }
557            }
558            RespValue::int(count)
559        }
560        "ZSCORE" => {
561            if args.len() < 3 {
562                return RespValue::err("wrong number of arguments for 'zscore' command");
563            }
564            match cache.zscore(&args[1], &args[2]) {
565                Some(score) => RespValue::bulk(&format!("{score}")),
566                None => RespValue::null(),
567            }
568        }
569        "ZRANK" => {
570            if args.len() < 3 {
571                return RespValue::err("wrong number of arguments for 'zrank' command");
572            }
573            match cache.zrank(&args[1], &args[2]) {
574                Some(rank) => RespValue::int(rank as i64),
575                None => RespValue::null(),
576            }
577        }
578        "ZRANGE" => {
579            if args.len() < 4 {
580                return RespValue::err("wrong number of arguments for 'zrange' command");
581            }
582            let start: usize = args[2].parse().unwrap_or(0);
583            let stop: usize = args[3].parse().unwrap_or(0);
584            let withscores = args[4..]
585                .iter()
586                .any(|a| a.eq_ignore_ascii_case("WITHSCORES"));
587            let items = cache.zrange(&args[1], start, stop);
588            if withscores {
589                let mut result = Vec::with_capacity(items.len() * 2);
590                for (member, score) in items {
591                    result.push(RespValue::bulk(&member));
592                    result.push(RespValue::bulk(&format!("{score}")));
593                }
594                RespValue::array(result)
595            } else {
596                RespValue::array(
597                    items
598                        .into_iter()
599                        .map(|(m, _)| RespValue::bulk(&m))
600                        .collect(),
601                )
602            }
603        }
604        "ZCARD" => {
605            if args.len() < 2 {
606                return RespValue::err("wrong number of arguments for 'zcard' command");
607            }
608            RespValue::int(cache.zcard(&args[1]) as i64)
609        }
610
611        // -----------------------------------------------------------------
612        // Keys / Server
613        // -----------------------------------------------------------------
614        "KEYS" => {
615            if args.len() < 2 {
616                return RespValue::err("wrong number of arguments for 'keys' command");
617            }
618            let keys = cache.keys(&args[1]);
619            RespValue::array(keys.into_iter().map(|s| RespValue::bulk(&s)).collect())
620        }
621        "TYPE" => {
622            if args.len() < 2 {
623                return RespValue::err("wrong number of arguments for 'type' command");
624            }
625            match cache.key_type(&args[1]) {
626                Some(t) => RespValue::SimpleString(t.to_string()),
627                None => RespValue::SimpleString("none".to_string()),
628            }
629        }
630        "DBSIZE" => RespValue::int(cache.dbsize() as i64),
631        "FLUSHALL" | "FLUSHDB" => {
632            cache.flushall();
633            RespValue::ok()
634        }
635        "INFO" => {
636            let stats = cache.info();
637            let info = format!(
638                "# Server\r\nredis_version:pylon-resp\r\n\r\n\
639                 # Stats\r\nhits:{}\r\nmisses:{}\r\nsets:{}\r\ndeletes:{}\r\nevictions:{}\r\nexpired:{}\r\n\r\n\
640                 # Keyspace\r\nkeys:{}\r\n",
641                stats.hits,
642                stats.misses,
643                stats.sets,
644                stats.deletes,
645                stats.evictions,
646                stats.expired,
647                cache.dbsize()
648            );
649            RespValue::bulk(&info)
650        }
651
652        _ => RespValue::err(&format!("unknown command '{cmd}'")),
653    }
654}
655
656// ---------------------------------------------------------------------------
657// Tests
658// ---------------------------------------------------------------------------
659
660#[cfg(test)]
661mod tests {
662    use super::*;
663    use crate::resp::RespValue;
664    use std::io::{BufReader, Cursor};
665
666    /// Build a RESP command array from string slices and return the wire bytes.
667    fn build_command(parts: &[&str]) -> Vec<u8> {
668        let val = RespValue::array(parts.iter().map(|s| RespValue::bulk(s)).collect());
669        val.serialize()
670    }
671
672    /// Simulate a client session: send raw RESP bytes and collect the response.
673    ///
674    /// Uses an in-memory buffer pair instead of real TCP sockets.
675    fn run_session(cache: &CachePlugin, commands: &[u8]) -> Vec<u8> {
676        let mut input = BufReader::new(Cursor::new(commands.to_vec()));
677        let mut output = Vec::new();
678
679        loop {
680            let value = match crate::resp::parse_resp(&mut input) {
681                Ok(v) => v,
682                Err(_) => break,
683            };
684
685            let args = match value {
686                RespValue::Array(Some(items)) => items,
687                _ => {
688                    output.extend_from_slice(&RespValue::err("Expected array command").serialize());
689                    continue;
690                }
691            };
692
693            let cmd_parts: Vec<String> = args
694                .iter()
695                .filter_map(|v| match v {
696                    RespValue::BulkString(Some(s)) => Some(s.clone()),
697                    RespValue::SimpleString(s) => Some(s.clone()),
698                    _ => None,
699                })
700                .collect();
701
702            if cmd_parts.is_empty() {
703                output.extend_from_slice(&RespValue::err("Empty command").serialize());
704                continue;
705            }
706
707            let response = execute_command(cache, &cmd_parts);
708            output.extend_from_slice(&response.serialize());
709
710            if cmd_parts[0].eq_ignore_ascii_case("QUIT") {
711                break;
712            }
713        }
714
715        output
716    }
717
718    /// Parse the first RESP value from raw bytes.
719    fn parse_response(data: &[u8]) -> RespValue {
720        let mut reader = BufReader::new(data);
721        crate::resp::parse_resp(&mut reader).expect("Failed to parse response")
722    }
723
724    /// Parse all RESP values from raw bytes.
725    fn parse_all_responses(data: &[u8]) -> Vec<RespValue> {
726        let mut reader = BufReader::new(data);
727        let mut results = Vec::new();
728        loop {
729            match crate::resp::parse_resp(&mut reader) {
730                Ok(v) => results.push(v),
731                Err(_) => break,
732            }
733        }
734        results
735    }
736
737    // -- Connection commands --
738
739    #[test]
740    fn ping_pong() {
741        let cache = CachePlugin::new(100);
742        let output = run_session(&cache, &build_command(&["PING"]));
743        assert_eq!(
744            parse_response(&output),
745            RespValue::SimpleString("PONG".into())
746        );
747    }
748
749    #[test]
750    fn ping_with_message() {
751        let cache = CachePlugin::new(100);
752        let output = run_session(&cache, &build_command(&["PING", "hello"]));
753        assert_eq!(parse_response(&output), RespValue::bulk("hello"));
754    }
755
756    #[test]
757    fn echo() {
758        let cache = CachePlugin::new(100);
759        let output = run_session(&cache, &build_command(&["ECHO", "test"]));
760        assert_eq!(parse_response(&output), RespValue::bulk("test"));
761    }
762
763    #[test]
764    fn quit() {
765        let cache = CachePlugin::new(100);
766        let output = run_session(&cache, &build_command(&["QUIT"]));
767        assert_eq!(parse_response(&output), RespValue::ok());
768    }
769
770    // -- String commands --
771
772    #[test]
773    fn set_and_get() {
774        let cache = CachePlugin::new(100);
775        let mut cmds = build_command(&["SET", "mykey", "myval"]);
776        cmds.extend_from_slice(&build_command(&["GET", "mykey"]));
777
778        let responses = parse_all_responses(&run_session(&cache, &cmds));
779        assert_eq!(responses[0], RespValue::ok());
780        assert_eq!(responses[1], RespValue::bulk("myval"));
781    }
782
783    #[test]
784    fn get_nonexistent() {
785        let cache = CachePlugin::new(100);
786        let output = run_session(&cache, &build_command(&["GET", "nope"]));
787        assert_eq!(parse_response(&output), RespValue::null());
788    }
789
790    #[test]
791    fn del_multiple() {
792        let cache = CachePlugin::new(100);
793        cache.set("a", "1", None);
794        cache.set("b", "2", None);
795
796        let output = run_session(&cache, &build_command(&["DEL", "a", "b", "c"]));
797        assert_eq!(parse_response(&output), RespValue::int(2));
798    }
799
800    #[test]
801    fn exists() {
802        let cache = CachePlugin::new(100);
803        cache.set("x", "1", None);
804
805        let mut cmds = build_command(&["EXISTS", "x"]);
806        cmds.extend_from_slice(&build_command(&["EXISTS", "y"]));
807
808        let responses = parse_all_responses(&run_session(&cache, &cmds));
809        assert_eq!(responses[0], RespValue::int(1));
810        assert_eq!(responses[1], RespValue::int(0));
811    }
812
813    #[test]
814    fn incr_decr() {
815        let cache = CachePlugin::new(100);
816        let mut cmds = build_command(&["INCR", "counter"]);
817        cmds.extend_from_slice(&build_command(&["INCR", "counter"]));
818        cmds.extend_from_slice(&build_command(&["DECR", "counter"]));
819
820        let responses = parse_all_responses(&run_session(&cache, &cmds));
821        assert_eq!(responses[0], RespValue::int(1));
822        assert_eq!(responses[1], RespValue::int(2));
823        assert_eq!(responses[2], RespValue::int(1));
824    }
825
826    #[test]
827    fn incrby() {
828        let cache = CachePlugin::new(100);
829        let output = run_session(&cache, &build_command(&["INCRBY", "k", "10"]));
830        assert_eq!(parse_response(&output), RespValue::int(10));
831    }
832
833    #[test]
834    fn setnx() {
835        let cache = CachePlugin::new(100);
836        let mut cmds = build_command(&["SETNX", "k", "first"]);
837        cmds.extend_from_slice(&build_command(&["SETNX", "k", "second"]));
838
839        let responses = parse_all_responses(&run_session(&cache, &cmds));
840        assert_eq!(responses[0], RespValue::int(1));
841        assert_eq!(responses[1], RespValue::int(0));
842    }
843
844    #[test]
845    fn set_nx_flag() {
846        let cache = CachePlugin::new(100);
847        cache.set("k", "existing", None);
848        let output = run_session(&cache, &build_command(&["SET", "k", "new", "NX"]));
849        assert_eq!(parse_response(&output), RespValue::null());
850        assert_eq!(cache.get("k").unwrap(), "existing");
851    }
852
853    #[test]
854    fn set_xx_flag() {
855        let cache = CachePlugin::new(100);
856        // XX on non-existent key should return null.
857        let output = run_session(&cache, &build_command(&["SET", "k", "v", "XX"]));
858        assert_eq!(parse_response(&output), RespValue::null());
859        assert!(cache.get("k").is_none());
860    }
861
862    #[test]
863    fn getset() {
864        let cache = CachePlugin::new(100);
865        cache.set("k", "old", None);
866        let output = run_session(&cache, &build_command(&["GETSET", "k", "new"]));
867        assert_eq!(parse_response(&output), RespValue::bulk("old"));
868        assert_eq!(cache.get("k").unwrap(), "new");
869    }
870
871    #[test]
872    fn mget_mset() {
873        let cache = CachePlugin::new(100);
874        let mut cmds = build_command(&["MSET", "a", "1", "b", "2"]);
875        cmds.extend_from_slice(&build_command(&["MGET", "a", "b", "c"]));
876
877        let responses = parse_all_responses(&run_session(&cache, &cmds));
878        assert_eq!(responses[0], RespValue::ok());
879        assert_eq!(
880            responses[1],
881            RespValue::array(vec![
882                RespValue::bulk("1"),
883                RespValue::bulk("2"),
884                RespValue::null(),
885            ])
886        );
887    }
888
889    // -- TTL commands --
890
891    #[test]
892    fn ttl_no_expiry() {
893        let cache = CachePlugin::new(100);
894        cache.set("k", "v", None);
895        let output = run_session(&cache, &build_command(&["TTL", "k"]));
896        assert_eq!(parse_response(&output), RespValue::int(-1));
897    }
898
899    #[test]
900    fn expire_and_persist() {
901        let cache = CachePlugin::new(100);
902        cache.set("k", "v", None);
903
904        let mut cmds = build_command(&["EXPIRE", "k", "60"]);
905        cmds.extend_from_slice(&build_command(&["PERSIST", "k"]));
906        cmds.extend_from_slice(&build_command(&["TTL", "k"]));
907
908        let responses = parse_all_responses(&run_session(&cache, &cmds));
909        assert_eq!(responses[0], RespValue::int(1)); // EXPIRE ok
910        assert_eq!(responses[1], RespValue::int(1)); // PERSIST ok
911        assert_eq!(responses[2], RespValue::int(-1)); // TTL = no expiry
912    }
913
914    // -- List commands --
915
916    #[test]
917    fn lpush_rpush_lrange() {
918        let cache = CachePlugin::new(100);
919        let mut cmds = build_command(&["RPUSH", "list", "a", "b"]);
920        cmds.extend_from_slice(&build_command(&["LPUSH", "list", "z"]));
921        cmds.extend_from_slice(&build_command(&["LRANGE", "list", "0", "-1"]));
922        cmds.extend_from_slice(&build_command(&["LLEN", "list"]));
923
924        let responses = parse_all_responses(&run_session(&cache, &cmds));
925        assert_eq!(responses[0], RespValue::int(2)); // RPUSH
926        assert_eq!(responses[1], RespValue::int(3)); // LPUSH
927                                                     // LRANGE
928        let items = match &responses[2] {
929            RespValue::Array(Some(v)) => v.clone(),
930            other => panic!("Expected array, got {other:?}"),
931        };
932        assert_eq!(items.len(), 3);
933        assert_eq!(items[0], RespValue::bulk("z"));
934        assert_eq!(responses[3], RespValue::int(3)); // LLEN
935    }
936
937    #[test]
938    fn lpop_rpop() {
939        let cache = CachePlugin::new(100);
940        cache.rpush("list", "a");
941        cache.rpush("list", "b");
942        cache.rpush("list", "c");
943
944        let mut cmds = build_command(&["LPOP", "list"]);
945        cmds.extend_from_slice(&build_command(&["RPOP", "list"]));
946
947        let responses = parse_all_responses(&run_session(&cache, &cmds));
948        assert_eq!(responses[0], RespValue::bulk("a"));
949        assert_eq!(responses[1], RespValue::bulk("c"));
950    }
951
952    // -- Set commands --
953
954    #[test]
955    fn sadd_smembers_scard() {
956        let cache = CachePlugin::new(100);
957        let mut cmds = build_command(&["SADD", "s", "a", "b", "a"]);
958        cmds.extend_from_slice(&build_command(&["SCARD", "s"]));
959
960        let responses = parse_all_responses(&run_session(&cache, &cmds));
961        assert_eq!(responses[0], RespValue::int(2)); // only 2 new
962        assert_eq!(responses[1], RespValue::int(2));
963    }
964
965    #[test]
966    fn sismember() {
967        let cache = CachePlugin::new(100);
968        cache.sadd("s", "x");
969
970        let mut cmds = build_command(&["SISMEMBER", "s", "x"]);
971        cmds.extend_from_slice(&build_command(&["SISMEMBER", "s", "y"]));
972
973        let responses = parse_all_responses(&run_session(&cache, &cmds));
974        assert_eq!(responses[0], RespValue::int(1));
975        assert_eq!(responses[1], RespValue::int(0));
976    }
977
978    // -- Hash commands --
979
980    #[test]
981    fn hset_hget_hgetall() {
982        let cache = CachePlugin::new(100);
983        let mut cmds = build_command(&["HSET", "h", "f1", "v1", "f2", "v2"]);
984        cmds.extend_from_slice(&build_command(&["HGET", "h", "f1"]));
985        cmds.extend_from_slice(&build_command(&["HLEN", "h"]));
986
987        let responses = parse_all_responses(&run_session(&cache, &cmds));
988        assert_eq!(responses[0], RespValue::int(2));
989        assert_eq!(responses[1], RespValue::bulk("v1"));
990        assert_eq!(responses[2], RespValue::int(2));
991    }
992
993    #[test]
994    fn hdel_hexists() {
995        let cache = CachePlugin::new(100);
996        cache.hset("h", "f", "v");
997
998        let mut cmds = build_command(&["HEXISTS", "h", "f"]);
999        cmds.extend_from_slice(&build_command(&["HDEL", "h", "f"]));
1000        cmds.extend_from_slice(&build_command(&["HEXISTS", "h", "f"]));
1001
1002        let responses = parse_all_responses(&run_session(&cache, &cmds));
1003        assert_eq!(responses[0], RespValue::int(1));
1004        assert_eq!(responses[1], RespValue::int(1));
1005        assert_eq!(responses[2], RespValue::int(0));
1006    }
1007
1008    #[test]
1009    fn hincrby() {
1010        let cache = CachePlugin::new(100);
1011        let output = run_session(&cache, &build_command(&["HINCRBY", "h", "f", "5"]));
1012        assert_eq!(parse_response(&output), RespValue::int(5));
1013    }
1014
1015    // -- Sorted set commands --
1016
1017    #[test]
1018    fn zadd_zscore_zrank() {
1019        let cache = CachePlugin::new(100);
1020        let mut cmds = build_command(&["ZADD", "z", "1.5", "a", "2.5", "b"]);
1021        cmds.extend_from_slice(&build_command(&["ZSCORE", "z", "a"]));
1022        cmds.extend_from_slice(&build_command(&["ZRANK", "z", "b"]));
1023        cmds.extend_from_slice(&build_command(&["ZCARD", "z"]));
1024
1025        let responses = parse_all_responses(&run_session(&cache, &cmds));
1026        assert_eq!(responses[0], RespValue::int(2));
1027        assert_eq!(responses[1], RespValue::bulk("1.5"));
1028        assert_eq!(responses[2], RespValue::int(1));
1029        assert_eq!(responses[3], RespValue::int(2));
1030    }
1031
1032    // -- Server commands --
1033
1034    #[test]
1035    fn dbsize_and_flushall() {
1036        let cache = CachePlugin::new(100);
1037        cache.set("a", "1", None);
1038        cache.set("b", "2", None);
1039
1040        let mut cmds = build_command(&["DBSIZE"]);
1041        cmds.extend_from_slice(&build_command(&["FLUSHALL"]));
1042        cmds.extend_from_slice(&build_command(&["DBSIZE"]));
1043
1044        let responses = parse_all_responses(&run_session(&cache, &cmds));
1045        assert_eq!(responses[0], RespValue::int(2));
1046        assert_eq!(responses[1], RespValue::ok());
1047        assert_eq!(responses[2], RespValue::int(0));
1048    }
1049
1050    #[test]
1051    fn keys_pattern() {
1052        let cache = CachePlugin::new(100);
1053        cache.set("user:1", "a", None);
1054        cache.set("user:2", "b", None);
1055        cache.set("session:1", "c", None);
1056
1057        let output = run_session(&cache, &build_command(&["KEYS", "user:*"]));
1058        let resp = parse_response(&output);
1059        match resp {
1060            RespValue::Array(Some(items)) => assert_eq!(items.len(), 2),
1061            other => panic!("Expected array, got {other:?}"),
1062        }
1063    }
1064
1065    #[test]
1066    fn type_command() {
1067        let cache = CachePlugin::new(100);
1068        cache.set("str", "v", None);
1069
1070        let mut cmds = build_command(&["TYPE", "str"]);
1071        cmds.extend_from_slice(&build_command(&["TYPE", "nonexistent"]));
1072
1073        let responses = parse_all_responses(&run_session(&cache, &cmds));
1074        assert_eq!(responses[0], RespValue::SimpleString("string".into()));
1075        assert_eq!(responses[1], RespValue::SimpleString("none".into()));
1076    }
1077
1078    #[test]
1079    fn info_command() {
1080        let cache = CachePlugin::new(100);
1081        let output = run_session(&cache, &build_command(&["INFO"]));
1082        match parse_response(&output) {
1083            RespValue::BulkString(Some(s)) => {
1084                assert!(s.contains("hits:"));
1085                assert!(s.contains("keys:"));
1086            }
1087            other => panic!("Expected bulk string, got {other:?}"),
1088        }
1089    }
1090
1091    #[test]
1092    fn unknown_command() {
1093        let cache = CachePlugin::new(100);
1094        let output = run_session(&cache, &build_command(&["FOOBAR"]));
1095        match parse_response(&output) {
1096            RespValue::Error(msg) => assert!(msg.contains("unknown command")),
1097            other => panic!("Expected error, got {other:?}"),
1098        }
1099    }
1100
1101    // -- Argument validation --
1102
1103    #[test]
1104    fn set_wrong_args() {
1105        let cache = CachePlugin::new(100);
1106        let output = run_session(&cache, &build_command(&["SET", "key"]));
1107        match parse_response(&output) {
1108            RespValue::Error(msg) => assert!(msg.contains("wrong number")),
1109            other => panic!("Expected error, got {other:?}"),
1110        }
1111    }
1112
1113    // -- Multi-command session --
1114
1115    #[test]
1116    fn full_session() {
1117        let cache = CachePlugin::new(100);
1118        let mut cmds = Vec::new();
1119        cmds.extend_from_slice(&build_command(&["PING"]));
1120        cmds.extend_from_slice(&build_command(&["SET", "greeting", "hello"]));
1121        cmds.extend_from_slice(&build_command(&["GET", "greeting"]));
1122        cmds.extend_from_slice(&build_command(&["QUIT"]));
1123
1124        let responses = parse_all_responses(&run_session(&cache, &cmds));
1125        assert_eq!(responses.len(), 4);
1126        assert_eq!(responses[0], RespValue::SimpleString("PONG".into()));
1127        assert_eq!(responses[1], RespValue::ok());
1128        assert_eq!(responses[2], RespValue::bulk("hello"));
1129        assert_eq!(responses[3], RespValue::ok()); // QUIT
1130    }
1131}