1use std::io::{BufReader, Write};
18use std::net::{TcpListener, TcpStream};
19use std::sync::Arc;
20use std::thread;
21
22use pylon_plugin::builtin::cache::CachePlugin;
23
24use crate::resp::{parse_resp, RespValue};
25
26pub fn start_resp_server(cache: Arc<CachePlugin>, port: u16) {
31 let addr = format!("0.0.0.0:{port}");
32 let listener = match TcpListener::bind(&addr) {
33 Ok(l) => l,
34 Err(e) => {
35 tracing::warn!("[resp] Failed to bind RESP server on {addr}: {e}");
36 return;
37 }
38 };
39
40 tracing::warn!("[resp] RESP server listening on resp://localhost:{port}");
41 tracing::warn!("[resp] Compatible with redis-cli: redis-cli -p {port}");
42
43 for stream in listener.incoming() {
44 let stream = match stream {
45 Ok(s) => s,
46 Err(_) => continue,
47 };
48
49 let cache = Arc::clone(&cache);
50 thread::spawn(move || {
51 handle_client(cache, stream);
52 });
53 }
54}
55
56fn handle_client(cache: Arc<CachePlugin>, stream: TcpStream) {
57 let write_stream = match stream.try_clone() {
58 Ok(s) => s,
59 Err(_) => return,
60 };
61 let mut reader = BufReader::new(stream);
62 let mut writer = write_stream;
63
64 loop {
65 let value = match parse_resp(&mut reader) {
66 Ok(v) => v,
67 Err(_) => break, };
69
70 let args = match value {
72 RespValue::Array(Some(items)) => items,
73 _ => {
74 let _ = writer.write_all(&RespValue::err("Expected array command").serialize());
75 continue;
76 }
77 };
78
79 let cmd_parts: Vec<String> = args
80 .iter()
81 .filter_map(|v| match v {
82 RespValue::BulkString(Some(s)) => Some(s.clone()),
83 RespValue::SimpleString(s) => Some(s.clone()),
84 _ => None,
85 })
86 .collect();
87
88 if cmd_parts.is_empty() {
89 let _ = writer.write_all(&RespValue::err("Empty command").serialize());
90 continue;
91 }
92
93 let response = execute_command(&cache, &cmd_parts);
94 let _ = writer.write_all(&response.serialize());
95 let _ = writer.flush();
96
97 if cmd_parts[0].eq_ignore_ascii_case("QUIT") {
99 break;
100 }
101 }
102}
103
104fn execute_command(cache: &CachePlugin, args: &[String]) -> RespValue {
106 let cmd = args[0].to_uppercase();
107
108 match cmd.as_str() {
109 "PING" => {
113 if args.len() > 1 {
114 RespValue::bulk(&args[1])
115 } else {
116 RespValue::SimpleString("PONG".into())
117 }
118 }
119 "ECHO" => {
120 if args.len() < 2 {
121 return RespValue::err("wrong number of arguments for 'echo' command");
122 }
123 RespValue::bulk(&args[1])
124 }
125 "QUIT" => RespValue::ok(),
126 "COMMAND" => RespValue::ok(), "SET" => {
132 if args.len() < 3 {
133 return RespValue::err("wrong number of arguments for 'set' command");
134 }
135
136 let mut ttl: Option<u64> = None;
138 let mut nx = false;
139 let mut xx = false;
140 let mut i = 3;
141 while i < args.len() {
142 match args[i].to_uppercase().as_str() {
143 "EX" => {
144 i += 1;
145 ttl = args.get(i).and_then(|v| v.parse::<u64>().ok());
146 }
147 "PX" => {
148 i += 1;
149 ttl = args.get(i).and_then(|v| v.parse::<u64>().ok()).map(|ms| {
150 if ms == 0 {
153 0
154 } else {
155 (ms + 999) / 1000
156 }
157 });
158 }
159 "NX" => nx = true,
160 "XX" => xx = true,
161 _ => {}
162 }
163 i += 1;
164 }
165
166 if nx {
167 if cache.setnx(&args[1], &args[2], ttl) {
168 RespValue::ok()
169 } else {
170 RespValue::null()
171 }
172 } else if xx {
173 if cache.exists(&args[1]) {
174 cache.set(&args[1], &args[2], ttl);
175 RespValue::ok()
176 } else {
177 RespValue::null()
178 }
179 } else {
180 cache.set(&args[1], &args[2], ttl);
181 RespValue::ok()
182 }
183 }
184 "GET" => {
185 if args.len() < 2 {
186 return RespValue::err("wrong number of arguments for 'get' command");
187 }
188 match cache.get(&args[1]) {
189 Some(v) => RespValue::bulk(&v),
190 None => RespValue::null(),
191 }
192 }
193 "DEL" => {
194 if args.len() < 2 {
195 return RespValue::err("wrong number of arguments for 'del' command");
196 }
197 let mut count = 0i64;
198 for key in &args[1..] {
199 if cache.del(key) {
200 count += 1;
201 }
202 }
203 RespValue::int(count)
204 }
205 "EXISTS" => {
206 if args.len() < 2 {
207 return RespValue::err("wrong number of arguments for 'exists' command");
208 }
209 let mut count = 0i64;
210 for key in &args[1..] {
211 if cache.exists(key) {
212 count += 1;
213 }
214 }
215 RespValue::int(count)
216 }
217 "INCR" => {
218 if args.len() < 2 {
219 return RespValue::err("wrong number of arguments for 'incr' command");
220 }
221 match cache.incr(&args[1]) {
222 Ok(n) => RespValue::int(n),
223 Err(e) => RespValue::err(&e),
224 }
225 }
226 "DECR" => {
227 if args.len() < 2 {
228 return RespValue::err("wrong number of arguments for 'decr' command");
229 }
230 match cache.decr(&args[1]) {
231 Ok(n) => RespValue::int(n),
232 Err(e) => RespValue::err(&e),
233 }
234 }
235 "INCRBY" => {
236 if args.len() < 3 {
237 return RespValue::err("wrong number of arguments for 'incrby' command");
238 }
239 let amount: i64 = match args[2].parse() {
240 Ok(n) => n,
241 Err(_) => return RespValue::err("value is not an integer or out of range"),
242 };
243 match cache.incrby(&args[1], amount) {
244 Ok(n) => RespValue::int(n),
245 Err(e) => RespValue::err(&e),
246 }
247 }
248 "SETNX" => {
249 if args.len() < 3 {
250 return RespValue::err("wrong number of arguments for 'setnx' command");
251 }
252 let set = cache.setnx(&args[1], &args[2], None);
253 RespValue::int(if set { 1 } else { 0 })
254 }
255 "GETSET" => {
256 if args.len() < 3 {
257 return RespValue::err("wrong number of arguments for 'getset' command");
258 }
259 match cache.getset(&args[1], &args[2]) {
260 Some(v) => RespValue::bulk(&v),
261 None => RespValue::null(),
262 }
263 }
264 "MGET" => {
265 if args.len() < 2 {
266 return RespValue::err("wrong number of arguments for 'mget' command");
267 }
268 let keys: Vec<&str> = args[1..].iter().map(|s| s.as_str()).collect();
269 let values = cache.mget(&keys);
270 RespValue::array(
271 values
272 .into_iter()
273 .map(|v| match v {
274 Some(s) => RespValue::bulk(&s),
275 None => RespValue::null(),
276 })
277 .collect(),
278 )
279 }
280 "MSET" => {
281 if args.len() < 3 || (args.len() - 1) % 2 != 0 {
282 return RespValue::err("wrong number of arguments for 'mset' command");
283 }
284 let mut pairs = Vec::new();
285 let mut i = 1;
286 while i < args.len() - 1 {
287 pairs.push((args[i].as_str(), args[i + 1].as_str()));
288 i += 2;
289 }
290 cache.mset(&pairs);
291 RespValue::ok()
292 }
293
294 "EXPIRE" => {
298 if args.len() < 3 {
299 return RespValue::err("wrong number of arguments for 'expire' command");
300 }
301 let secs: u64 = match args[2].parse() {
302 Ok(n) => n,
303 Err(_) => return RespValue::err("value is not an integer or out of range"),
304 };
305 RespValue::int(if cache.expire(&args[1], secs) { 1 } else { 0 })
306 }
307 "PERSIST" => {
308 if args.len() < 2 {
309 return RespValue::err("wrong number of arguments for 'persist' command");
310 }
311 RespValue::int(if cache.persist(&args[1]) { 1 } else { 0 })
312 }
313 "TTL" => {
314 if args.len() < 2 {
315 return RespValue::err("wrong number of arguments for 'ttl' command");
316 }
317 RespValue::int(cache.ttl(&args[1]))
318 }
319
320 "LPUSH" => {
324 if args.len() < 3 {
325 return RespValue::err("wrong number of arguments for 'lpush' command");
326 }
327 let mut len = 0;
328 for val in &args[2..] {
329 len = cache.lpush(&args[1], val);
330 }
331 RespValue::int(len as i64)
332 }
333 "RPUSH" => {
334 if args.len() < 3 {
335 return RespValue::err("wrong number of arguments for 'rpush' command");
336 }
337 let mut len = 0;
338 for val in &args[2..] {
339 len = cache.rpush(&args[1], val);
340 }
341 RespValue::int(len as i64)
342 }
343 "LPOP" => {
344 if args.len() < 2 {
345 return RespValue::err("wrong number of arguments for 'lpop' command");
346 }
347 match cache.lpop(&args[1]) {
348 Some(v) => RespValue::bulk(&v),
349 None => RespValue::null(),
350 }
351 }
352 "RPOP" => {
353 if args.len() < 2 {
354 return RespValue::err("wrong number of arguments for 'rpop' command");
355 }
356 match cache.rpop(&args[1]) {
357 Some(v) => RespValue::bulk(&v),
358 None => RespValue::null(),
359 }
360 }
361 "LRANGE" => {
362 if args.len() < 4 {
363 return RespValue::err("wrong number of arguments for 'lrange' command");
364 }
365 let start: i64 = args[2].parse().unwrap_or(0);
366 let stop: i64 = args[3].parse().unwrap_or(-1);
367 let items = cache.lrange(&args[1], start, stop);
368 RespValue::array(items.into_iter().map(|s| RespValue::bulk(&s)).collect())
369 }
370 "LLEN" => {
371 if args.len() < 2 {
372 return RespValue::err("wrong number of arguments for 'llen' command");
373 }
374 RespValue::int(cache.llen(&args[1]) as i64)
375 }
376
377 "SADD" => {
381 if args.len() < 3 {
382 return RespValue::err("wrong number of arguments for 'sadd' command");
383 }
384 let mut added = 0i64;
385 for member in &args[2..] {
386 if cache.sadd(&args[1], member) {
387 added += 1;
388 }
389 }
390 RespValue::int(added)
391 }
392 "SREM" => {
393 if args.len() < 3 {
394 return RespValue::err("wrong number of arguments for 'srem' command");
395 }
396 let mut removed = 0i64;
397 for member in &args[2..] {
398 if cache.srem(&args[1], member) {
399 removed += 1;
400 }
401 }
402 RespValue::int(removed)
403 }
404 "SMEMBERS" => {
405 if args.len() < 2 {
406 return RespValue::err("wrong number of arguments for 'smembers' command");
407 }
408 let members = cache.smembers(&args[1]);
409 RespValue::array(members.into_iter().map(|s| RespValue::bulk(&s)).collect())
410 }
411 "SISMEMBER" => {
412 if args.len() < 3 {
413 return RespValue::err("wrong number of arguments for 'sismember' command");
414 }
415 RespValue::int(if cache.sismember(&args[1], &args[2]) {
416 1
417 } else {
418 0
419 })
420 }
421 "SCARD" => {
422 if args.len() < 2 {
423 return RespValue::err("wrong number of arguments for 'scard' command");
424 }
425 RespValue::int(cache.scard(&args[1]) as i64)
426 }
427 "SINTER" => {
428 if args.len() < 3 {
429 return RespValue::err("wrong number of arguments for 'sinter' command");
430 }
431 let result = cache.sinter(&args[1], &args[2]);
432 RespValue::array(result.into_iter().map(|s| RespValue::bulk(&s)).collect())
433 }
434 "SUNION" => {
435 if args.len() < 3 {
436 return RespValue::err("wrong number of arguments for 'sunion' command");
437 }
438 let result = cache.sunion(&args[1], &args[2]);
439 RespValue::array(result.into_iter().map(|s| RespValue::bulk(&s)).collect())
440 }
441
442 "HSET" => {
446 if args.len() < 4 || (args.len() - 2) % 2 != 0 {
447 return RespValue::err("wrong number of arguments for 'hset' command");
448 }
449 let mut count = 0i64;
450 let mut i = 2;
451 while i < args.len() - 1 {
452 cache.hset(&args[1], &args[i], &args[i + 1]);
453 count += 1;
454 i += 2;
455 }
456 RespValue::int(count)
457 }
458 "HGET" => {
459 if args.len() < 3 {
460 return RespValue::err("wrong number of arguments for 'hget' command");
461 }
462 match cache.hget(&args[1], &args[2]) {
463 Some(v) => RespValue::bulk(&v),
464 None => RespValue::null(),
465 }
466 }
467 "HDEL" => {
468 if args.len() < 3 {
469 return RespValue::err("wrong number of arguments for 'hdel' command");
470 }
471 let mut count = 0i64;
472 for field in &args[2..] {
473 if cache.hdel(&args[1], field) {
474 count += 1;
475 }
476 }
477 RespValue::int(count)
478 }
479 "HGETALL" => {
480 if args.len() < 2 {
481 return RespValue::err("wrong number of arguments for 'hgetall' command");
482 }
483 let map = cache.hgetall(&args[1]);
484 let mut items = Vec::with_capacity(map.len() * 2);
485 for (k, v) in &map {
486 items.push(RespValue::bulk(k));
487 items.push(RespValue::bulk(v));
488 }
489 RespValue::array(items)
490 }
491 "HEXISTS" => {
492 if args.len() < 3 {
493 return RespValue::err("wrong number of arguments for 'hexists' command");
494 }
495 RespValue::int(if cache.hexists(&args[1], &args[2]) {
496 1
497 } else {
498 0
499 })
500 }
501 "HLEN" => {
502 if args.len() < 2 {
503 return RespValue::err("wrong number of arguments for 'hlen' command");
504 }
505 RespValue::int(cache.hlen(&args[1]) as i64)
506 }
507 "HKEYS" => {
508 if args.len() < 2 {
509 return RespValue::err("wrong number of arguments for 'hkeys' command");
510 }
511 let keys = cache.hkeys(&args[1]);
512 RespValue::array(keys.into_iter().map(|s| RespValue::bulk(&s)).collect())
513 }
514 "HINCRBY" => {
515 if args.len() < 4 {
516 return RespValue::err("wrong number of arguments for 'hincrby' command");
517 }
518 let amount: i64 = match args[3].parse() {
519 Ok(n) => n,
520 Err(_) => return RespValue::err("value is not an integer or out of range"),
521 };
522 match cache.hincrby(&args[1], &args[2], amount) {
523 Ok(n) => RespValue::int(n),
524 Err(e) => RespValue::err(&e),
525 }
526 }
527
528 "ZADD" => {
532 if args.len() < 4 || (args.len() - 2) % 2 != 0 {
533 return RespValue::err("wrong number of arguments for 'zadd' command");
534 }
535 let mut count = 0i64;
536 let mut i = 2;
537 while i < args.len() - 1 {
538 let score: f64 = match args[i].parse() {
539 Ok(n) => n,
540 Err(_) => return RespValue::err("value is not a valid float"),
541 };
542 cache.zadd(&args[1], score, &args[i + 1]);
543 count += 1;
544 i += 2;
545 }
546 RespValue::int(count)
547 }
548 "ZREM" => {
549 if args.len() < 3 {
550 return RespValue::err("wrong number of arguments for 'zrem' command");
551 }
552 let mut count = 0i64;
553 for member in &args[2..] {
554 if cache.zrem(&args[1], member) {
555 count += 1;
556 }
557 }
558 RespValue::int(count)
559 }
560 "ZSCORE" => {
561 if args.len() < 3 {
562 return RespValue::err("wrong number of arguments for 'zscore' command");
563 }
564 match cache.zscore(&args[1], &args[2]) {
565 Some(score) => RespValue::bulk(&format!("{score}")),
566 None => RespValue::null(),
567 }
568 }
569 "ZRANK" => {
570 if args.len() < 3 {
571 return RespValue::err("wrong number of arguments for 'zrank' command");
572 }
573 match cache.zrank(&args[1], &args[2]) {
574 Some(rank) => RespValue::int(rank as i64),
575 None => RespValue::null(),
576 }
577 }
578 "ZRANGE" => {
579 if args.len() < 4 {
580 return RespValue::err("wrong number of arguments for 'zrange' command");
581 }
582 let start: usize = args[2].parse().unwrap_or(0);
583 let stop: usize = args[3].parse().unwrap_or(0);
584 let withscores = args[4..]
585 .iter()
586 .any(|a| a.eq_ignore_ascii_case("WITHSCORES"));
587 let items = cache.zrange(&args[1], start, stop);
588 if withscores {
589 let mut result = Vec::with_capacity(items.len() * 2);
590 for (member, score) in items {
591 result.push(RespValue::bulk(&member));
592 result.push(RespValue::bulk(&format!("{score}")));
593 }
594 RespValue::array(result)
595 } else {
596 RespValue::array(
597 items
598 .into_iter()
599 .map(|(m, _)| RespValue::bulk(&m))
600 .collect(),
601 )
602 }
603 }
604 "ZCARD" => {
605 if args.len() < 2 {
606 return RespValue::err("wrong number of arguments for 'zcard' command");
607 }
608 RespValue::int(cache.zcard(&args[1]) as i64)
609 }
610
611 "KEYS" => {
615 if args.len() < 2 {
616 return RespValue::err("wrong number of arguments for 'keys' command");
617 }
618 let keys = cache.keys(&args[1]);
619 RespValue::array(keys.into_iter().map(|s| RespValue::bulk(&s)).collect())
620 }
621 "TYPE" => {
622 if args.len() < 2 {
623 return RespValue::err("wrong number of arguments for 'type' command");
624 }
625 match cache.key_type(&args[1]) {
626 Some(t) => RespValue::SimpleString(t.to_string()),
627 None => RespValue::SimpleString("none".to_string()),
628 }
629 }
630 "DBSIZE" => RespValue::int(cache.dbsize() as i64),
631 "FLUSHALL" | "FLUSHDB" => {
632 cache.flushall();
633 RespValue::ok()
634 }
635 "INFO" => {
636 let stats = cache.info();
637 let info = format!(
638 "# Server\r\nredis_version:pylon-resp\r\n\r\n\
639 # Stats\r\nhits:{}\r\nmisses:{}\r\nsets:{}\r\ndeletes:{}\r\nevictions:{}\r\nexpired:{}\r\n\r\n\
640 # Keyspace\r\nkeys:{}\r\n",
641 stats.hits,
642 stats.misses,
643 stats.sets,
644 stats.deletes,
645 stats.evictions,
646 stats.expired,
647 cache.dbsize()
648 );
649 RespValue::bulk(&info)
650 }
651
652 _ => RespValue::err(&format!("unknown command '{cmd}'")),
653 }
654}
655
656#[cfg(test)]
661mod tests {
662 use super::*;
663 use crate::resp::RespValue;
664 use std::io::{BufReader, Cursor};
665
666 fn build_command(parts: &[&str]) -> Vec<u8> {
668 let val = RespValue::array(parts.iter().map(|s| RespValue::bulk(s)).collect());
669 val.serialize()
670 }
671
672 fn run_session(cache: &CachePlugin, commands: &[u8]) -> Vec<u8> {
676 let mut input = BufReader::new(Cursor::new(commands.to_vec()));
677 let mut output = Vec::new();
678
679 loop {
680 let value = match crate::resp::parse_resp(&mut input) {
681 Ok(v) => v,
682 Err(_) => break,
683 };
684
685 let args = match value {
686 RespValue::Array(Some(items)) => items,
687 _ => {
688 output.extend_from_slice(&RespValue::err("Expected array command").serialize());
689 continue;
690 }
691 };
692
693 let cmd_parts: Vec<String> = args
694 .iter()
695 .filter_map(|v| match v {
696 RespValue::BulkString(Some(s)) => Some(s.clone()),
697 RespValue::SimpleString(s) => Some(s.clone()),
698 _ => None,
699 })
700 .collect();
701
702 if cmd_parts.is_empty() {
703 output.extend_from_slice(&RespValue::err("Empty command").serialize());
704 continue;
705 }
706
707 let response = execute_command(cache, &cmd_parts);
708 output.extend_from_slice(&response.serialize());
709
710 if cmd_parts[0].eq_ignore_ascii_case("QUIT") {
711 break;
712 }
713 }
714
715 output
716 }
717
718 fn parse_response(data: &[u8]) -> RespValue {
720 let mut reader = BufReader::new(data);
721 crate::resp::parse_resp(&mut reader).expect("Failed to parse response")
722 }
723
724 fn parse_all_responses(data: &[u8]) -> Vec<RespValue> {
726 let mut reader = BufReader::new(data);
727 let mut results = Vec::new();
728 loop {
729 match crate::resp::parse_resp(&mut reader) {
730 Ok(v) => results.push(v),
731 Err(_) => break,
732 }
733 }
734 results
735 }
736
737 #[test]
740 fn ping_pong() {
741 let cache = CachePlugin::new(100);
742 let output = run_session(&cache, &build_command(&["PING"]));
743 assert_eq!(
744 parse_response(&output),
745 RespValue::SimpleString("PONG".into())
746 );
747 }
748
749 #[test]
750 fn ping_with_message() {
751 let cache = CachePlugin::new(100);
752 let output = run_session(&cache, &build_command(&["PING", "hello"]));
753 assert_eq!(parse_response(&output), RespValue::bulk("hello"));
754 }
755
756 #[test]
757 fn echo() {
758 let cache = CachePlugin::new(100);
759 let output = run_session(&cache, &build_command(&["ECHO", "test"]));
760 assert_eq!(parse_response(&output), RespValue::bulk("test"));
761 }
762
763 #[test]
764 fn quit() {
765 let cache = CachePlugin::new(100);
766 let output = run_session(&cache, &build_command(&["QUIT"]));
767 assert_eq!(parse_response(&output), RespValue::ok());
768 }
769
770 #[test]
773 fn set_and_get() {
774 let cache = CachePlugin::new(100);
775 let mut cmds = build_command(&["SET", "mykey", "myval"]);
776 cmds.extend_from_slice(&build_command(&["GET", "mykey"]));
777
778 let responses = parse_all_responses(&run_session(&cache, &cmds));
779 assert_eq!(responses[0], RespValue::ok());
780 assert_eq!(responses[1], RespValue::bulk("myval"));
781 }
782
783 #[test]
784 fn get_nonexistent() {
785 let cache = CachePlugin::new(100);
786 let output = run_session(&cache, &build_command(&["GET", "nope"]));
787 assert_eq!(parse_response(&output), RespValue::null());
788 }
789
790 #[test]
791 fn del_multiple() {
792 let cache = CachePlugin::new(100);
793 cache.set("a", "1", None);
794 cache.set("b", "2", None);
795
796 let output = run_session(&cache, &build_command(&["DEL", "a", "b", "c"]));
797 assert_eq!(parse_response(&output), RespValue::int(2));
798 }
799
800 #[test]
801 fn exists() {
802 let cache = CachePlugin::new(100);
803 cache.set("x", "1", None);
804
805 let mut cmds = build_command(&["EXISTS", "x"]);
806 cmds.extend_from_slice(&build_command(&["EXISTS", "y"]));
807
808 let responses = parse_all_responses(&run_session(&cache, &cmds));
809 assert_eq!(responses[0], RespValue::int(1));
810 assert_eq!(responses[1], RespValue::int(0));
811 }
812
813 #[test]
814 fn incr_decr() {
815 let cache = CachePlugin::new(100);
816 let mut cmds = build_command(&["INCR", "counter"]);
817 cmds.extend_from_slice(&build_command(&["INCR", "counter"]));
818 cmds.extend_from_slice(&build_command(&["DECR", "counter"]));
819
820 let responses = parse_all_responses(&run_session(&cache, &cmds));
821 assert_eq!(responses[0], RespValue::int(1));
822 assert_eq!(responses[1], RespValue::int(2));
823 assert_eq!(responses[2], RespValue::int(1));
824 }
825
826 #[test]
827 fn incrby() {
828 let cache = CachePlugin::new(100);
829 let output = run_session(&cache, &build_command(&["INCRBY", "k", "10"]));
830 assert_eq!(parse_response(&output), RespValue::int(10));
831 }
832
833 #[test]
834 fn setnx() {
835 let cache = CachePlugin::new(100);
836 let mut cmds = build_command(&["SETNX", "k", "first"]);
837 cmds.extend_from_slice(&build_command(&["SETNX", "k", "second"]));
838
839 let responses = parse_all_responses(&run_session(&cache, &cmds));
840 assert_eq!(responses[0], RespValue::int(1));
841 assert_eq!(responses[1], RespValue::int(0));
842 }
843
844 #[test]
845 fn set_nx_flag() {
846 let cache = CachePlugin::new(100);
847 cache.set("k", "existing", None);
848 let output = run_session(&cache, &build_command(&["SET", "k", "new", "NX"]));
849 assert_eq!(parse_response(&output), RespValue::null());
850 assert_eq!(cache.get("k").unwrap(), "existing");
851 }
852
853 #[test]
854 fn set_xx_flag() {
855 let cache = CachePlugin::new(100);
856 let output = run_session(&cache, &build_command(&["SET", "k", "v", "XX"]));
858 assert_eq!(parse_response(&output), RespValue::null());
859 assert!(cache.get("k").is_none());
860 }
861
862 #[test]
863 fn getset() {
864 let cache = CachePlugin::new(100);
865 cache.set("k", "old", None);
866 let output = run_session(&cache, &build_command(&["GETSET", "k", "new"]));
867 assert_eq!(parse_response(&output), RespValue::bulk("old"));
868 assert_eq!(cache.get("k").unwrap(), "new");
869 }
870
871 #[test]
872 fn mget_mset() {
873 let cache = CachePlugin::new(100);
874 let mut cmds = build_command(&["MSET", "a", "1", "b", "2"]);
875 cmds.extend_from_slice(&build_command(&["MGET", "a", "b", "c"]));
876
877 let responses = parse_all_responses(&run_session(&cache, &cmds));
878 assert_eq!(responses[0], RespValue::ok());
879 assert_eq!(
880 responses[1],
881 RespValue::array(vec![
882 RespValue::bulk("1"),
883 RespValue::bulk("2"),
884 RespValue::null(),
885 ])
886 );
887 }
888
889 #[test]
892 fn ttl_no_expiry() {
893 let cache = CachePlugin::new(100);
894 cache.set("k", "v", None);
895 let output = run_session(&cache, &build_command(&["TTL", "k"]));
896 assert_eq!(parse_response(&output), RespValue::int(-1));
897 }
898
899 #[test]
900 fn expire_and_persist() {
901 let cache = CachePlugin::new(100);
902 cache.set("k", "v", None);
903
904 let mut cmds = build_command(&["EXPIRE", "k", "60"]);
905 cmds.extend_from_slice(&build_command(&["PERSIST", "k"]));
906 cmds.extend_from_slice(&build_command(&["TTL", "k"]));
907
908 let responses = parse_all_responses(&run_session(&cache, &cmds));
909 assert_eq!(responses[0], RespValue::int(1)); assert_eq!(responses[1], RespValue::int(1)); assert_eq!(responses[2], RespValue::int(-1)); }
913
914 #[test]
917 fn lpush_rpush_lrange() {
918 let cache = CachePlugin::new(100);
919 let mut cmds = build_command(&["RPUSH", "list", "a", "b"]);
920 cmds.extend_from_slice(&build_command(&["LPUSH", "list", "z"]));
921 cmds.extend_from_slice(&build_command(&["LRANGE", "list", "0", "-1"]));
922 cmds.extend_from_slice(&build_command(&["LLEN", "list"]));
923
924 let responses = parse_all_responses(&run_session(&cache, &cmds));
925 assert_eq!(responses[0], RespValue::int(2)); assert_eq!(responses[1], RespValue::int(3)); let items = match &responses[2] {
929 RespValue::Array(Some(v)) => v.clone(),
930 other => panic!("Expected array, got {other:?}"),
931 };
932 assert_eq!(items.len(), 3);
933 assert_eq!(items[0], RespValue::bulk("z"));
934 assert_eq!(responses[3], RespValue::int(3)); }
936
937 #[test]
938 fn lpop_rpop() {
939 let cache = CachePlugin::new(100);
940 cache.rpush("list", "a");
941 cache.rpush("list", "b");
942 cache.rpush("list", "c");
943
944 let mut cmds = build_command(&["LPOP", "list"]);
945 cmds.extend_from_slice(&build_command(&["RPOP", "list"]));
946
947 let responses = parse_all_responses(&run_session(&cache, &cmds));
948 assert_eq!(responses[0], RespValue::bulk("a"));
949 assert_eq!(responses[1], RespValue::bulk("c"));
950 }
951
952 #[test]
955 fn sadd_smembers_scard() {
956 let cache = CachePlugin::new(100);
957 let mut cmds = build_command(&["SADD", "s", "a", "b", "a"]);
958 cmds.extend_from_slice(&build_command(&["SCARD", "s"]));
959
960 let responses = parse_all_responses(&run_session(&cache, &cmds));
961 assert_eq!(responses[0], RespValue::int(2)); assert_eq!(responses[1], RespValue::int(2));
963 }
964
965 #[test]
966 fn sismember() {
967 let cache = CachePlugin::new(100);
968 cache.sadd("s", "x");
969
970 let mut cmds = build_command(&["SISMEMBER", "s", "x"]);
971 cmds.extend_from_slice(&build_command(&["SISMEMBER", "s", "y"]));
972
973 let responses = parse_all_responses(&run_session(&cache, &cmds));
974 assert_eq!(responses[0], RespValue::int(1));
975 assert_eq!(responses[1], RespValue::int(0));
976 }
977
978 #[test]
981 fn hset_hget_hgetall() {
982 let cache = CachePlugin::new(100);
983 let mut cmds = build_command(&["HSET", "h", "f1", "v1", "f2", "v2"]);
984 cmds.extend_from_slice(&build_command(&["HGET", "h", "f1"]));
985 cmds.extend_from_slice(&build_command(&["HLEN", "h"]));
986
987 let responses = parse_all_responses(&run_session(&cache, &cmds));
988 assert_eq!(responses[0], RespValue::int(2));
989 assert_eq!(responses[1], RespValue::bulk("v1"));
990 assert_eq!(responses[2], RespValue::int(2));
991 }
992
993 #[test]
994 fn hdel_hexists() {
995 let cache = CachePlugin::new(100);
996 cache.hset("h", "f", "v");
997
998 let mut cmds = build_command(&["HEXISTS", "h", "f"]);
999 cmds.extend_from_slice(&build_command(&["HDEL", "h", "f"]));
1000 cmds.extend_from_slice(&build_command(&["HEXISTS", "h", "f"]));
1001
1002 let responses = parse_all_responses(&run_session(&cache, &cmds));
1003 assert_eq!(responses[0], RespValue::int(1));
1004 assert_eq!(responses[1], RespValue::int(1));
1005 assert_eq!(responses[2], RespValue::int(0));
1006 }
1007
1008 #[test]
1009 fn hincrby() {
1010 let cache = CachePlugin::new(100);
1011 let output = run_session(&cache, &build_command(&["HINCRBY", "h", "f", "5"]));
1012 assert_eq!(parse_response(&output), RespValue::int(5));
1013 }
1014
1015 #[test]
1018 fn zadd_zscore_zrank() {
1019 let cache = CachePlugin::new(100);
1020 let mut cmds = build_command(&["ZADD", "z", "1.5", "a", "2.5", "b"]);
1021 cmds.extend_from_slice(&build_command(&["ZSCORE", "z", "a"]));
1022 cmds.extend_from_slice(&build_command(&["ZRANK", "z", "b"]));
1023 cmds.extend_from_slice(&build_command(&["ZCARD", "z"]));
1024
1025 let responses = parse_all_responses(&run_session(&cache, &cmds));
1026 assert_eq!(responses[0], RespValue::int(2));
1027 assert_eq!(responses[1], RespValue::bulk("1.5"));
1028 assert_eq!(responses[2], RespValue::int(1));
1029 assert_eq!(responses[3], RespValue::int(2));
1030 }
1031
1032 #[test]
1035 fn dbsize_and_flushall() {
1036 let cache = CachePlugin::new(100);
1037 cache.set("a", "1", None);
1038 cache.set("b", "2", None);
1039
1040 let mut cmds = build_command(&["DBSIZE"]);
1041 cmds.extend_from_slice(&build_command(&["FLUSHALL"]));
1042 cmds.extend_from_slice(&build_command(&["DBSIZE"]));
1043
1044 let responses = parse_all_responses(&run_session(&cache, &cmds));
1045 assert_eq!(responses[0], RespValue::int(2));
1046 assert_eq!(responses[1], RespValue::ok());
1047 assert_eq!(responses[2], RespValue::int(0));
1048 }
1049
1050 #[test]
1051 fn keys_pattern() {
1052 let cache = CachePlugin::new(100);
1053 cache.set("user:1", "a", None);
1054 cache.set("user:2", "b", None);
1055 cache.set("session:1", "c", None);
1056
1057 let output = run_session(&cache, &build_command(&["KEYS", "user:*"]));
1058 let resp = parse_response(&output);
1059 match resp {
1060 RespValue::Array(Some(items)) => assert_eq!(items.len(), 2),
1061 other => panic!("Expected array, got {other:?}"),
1062 }
1063 }
1064
1065 #[test]
1066 fn type_command() {
1067 let cache = CachePlugin::new(100);
1068 cache.set("str", "v", None);
1069
1070 let mut cmds = build_command(&["TYPE", "str"]);
1071 cmds.extend_from_slice(&build_command(&["TYPE", "nonexistent"]));
1072
1073 let responses = parse_all_responses(&run_session(&cache, &cmds));
1074 assert_eq!(responses[0], RespValue::SimpleString("string".into()));
1075 assert_eq!(responses[1], RespValue::SimpleString("none".into()));
1076 }
1077
1078 #[test]
1079 fn info_command() {
1080 let cache = CachePlugin::new(100);
1081 let output = run_session(&cache, &build_command(&["INFO"]));
1082 match parse_response(&output) {
1083 RespValue::BulkString(Some(s)) => {
1084 assert!(s.contains("hits:"));
1085 assert!(s.contains("keys:"));
1086 }
1087 other => panic!("Expected bulk string, got {other:?}"),
1088 }
1089 }
1090
1091 #[test]
1092 fn unknown_command() {
1093 let cache = CachePlugin::new(100);
1094 let output = run_session(&cache, &build_command(&["FOOBAR"]));
1095 match parse_response(&output) {
1096 RespValue::Error(msg) => assert!(msg.contains("unknown command")),
1097 other => panic!("Expected error, got {other:?}"),
1098 }
1099 }
1100
1101 #[test]
1104 fn set_wrong_args() {
1105 let cache = CachePlugin::new(100);
1106 let output = run_session(&cache, &build_command(&["SET", "key"]));
1107 match parse_response(&output) {
1108 RespValue::Error(msg) => assert!(msg.contains("wrong number")),
1109 other => panic!("Expected error, got {other:?}"),
1110 }
1111 }
1112
1113 #[test]
1116 fn full_session() {
1117 let cache = CachePlugin::new(100);
1118 let mut cmds = Vec::new();
1119 cmds.extend_from_slice(&build_command(&["PING"]));
1120 cmds.extend_from_slice(&build_command(&["SET", "greeting", "hello"]));
1121 cmds.extend_from_slice(&build_command(&["GET", "greeting"]));
1122 cmds.extend_from_slice(&build_command(&["QUIT"]));
1123
1124 let responses = parse_all_responses(&run_session(&cache, &cmds));
1125 assert_eq!(responses.len(), 4);
1126 assert_eq!(responses[0], RespValue::SimpleString("PONG".into()));
1127 assert_eq!(responses[1], RespValue::ok());
1128 assert_eq!(responses[2], RespValue::bulk("hello"));
1129 assert_eq!(responses[3], RespValue::ok()); }
1131}