1use crate::pubsub::PubSubBroker;
8use pylon_plugin::builtin::cache::CachePlugin;
9
10pub fn handle_cache_command(cache: &CachePlugin, body: &str) -> (u16, String) {
19 let data: serde_json::Value = match serde_json::from_str(body) {
20 Ok(v) => v,
21 Err(e) => {
22 return (
23 400,
24 serde_json::json!({"ok": false, "error": format!("Invalid JSON: {e}")}).to_string(),
25 )
26 }
27 };
28
29 let cmd = data
30 .get("cmd")
31 .and_then(|v| v.as_str())
32 .unwrap_or("")
33 .to_uppercase();
34 let key = data.get("key").and_then(|v| v.as_str()).unwrap_or("");
35
36 match cmd.as_str() {
37 "SET" => {
38 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
39 let ttl = data.get("ttl").and_then(|v| v.as_u64());
40 cache.set(key, value, ttl);
41 (200, serde_json::json!({"ok": true}).to_string())
42 }
43 "GET" => match cache.get(key) {
44 Some(v) => (
45 200,
46 serde_json::json!({"ok": true, "result": v}).to_string(),
47 ),
48 None => (
49 200,
50 serde_json::json!({"ok": true, "result": null}).to_string(),
51 ),
52 },
53 "DEL" => {
54 let deleted = cache.del(key);
55 (
56 200,
57 serde_json::json!({"ok": true, "result": deleted}).to_string(),
58 )
59 }
60 "EXISTS" => {
61 let exists = cache.exists(key);
62 (
63 200,
64 serde_json::json!({"ok": true, "result": exists}).to_string(),
65 )
66 }
67 "INCR" => match cache.incr(key) {
68 Ok(n) => (
69 200,
70 serde_json::json!({"ok": true, "result": n}).to_string(),
71 ),
72 Err(e) => (
73 400,
74 serde_json::json!({"ok": false, "error": e}).to_string(),
75 ),
76 },
77 "DECR" => match cache.decr(key) {
78 Ok(n) => (
79 200,
80 serde_json::json!({"ok": true, "result": n}).to_string(),
81 ),
82 Err(e) => (
83 400,
84 serde_json::json!({"ok": false, "error": e}).to_string(),
85 ),
86 },
87 "INCRBY" => {
88 let amount = data.get("amount").and_then(|v| v.as_i64()).unwrap_or(1);
89 match cache.incrby(key, amount) {
90 Ok(n) => (
91 200,
92 serde_json::json!({"ok": true, "result": n}).to_string(),
93 ),
94 Err(e) => (
95 400,
96 serde_json::json!({"ok": false, "error": e}).to_string(),
97 ),
98 }
99 }
100 "SETNX" => {
101 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
102 let ttl = data.get("ttl").and_then(|v| v.as_u64());
103 let was_set = cache.setnx(key, value, ttl);
104 (
105 200,
106 serde_json::json!({"ok": true, "result": was_set}).to_string(),
107 )
108 }
109 "GETSET" => {
110 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
111 let old = cache.getset(key, value);
112 (
113 200,
114 serde_json::json!({"ok": true, "result": old}).to_string(),
115 )
116 }
117 "MGET" => {
118 let keys_arr: Vec<&str> = data
119 .get("keys")
120 .and_then(|v| v.as_array())
121 .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
122 .unwrap_or_default();
123 let results = cache.mget(&keys_arr);
124 (
125 200,
126 serde_json::json!({"ok": true, "result": results}).to_string(),
127 )
128 }
129 "MSET" => {
130 let pairs_val = data.get("pairs").and_then(|v| v.as_object());
131 if let Some(obj) = pairs_val {
132 let pairs: Vec<(&str, &str)> = obj
133 .iter()
134 .filter_map(|(k, v)| v.as_str().map(|s| (k.as_str(), s)))
135 .collect();
136 cache.mset(&pairs);
137 (200, serde_json::json!({"ok": true}).to_string())
138 } else {
139 (
140 400,
141 serde_json::json!({
142 "error": {
143 "code": "INVALID_ARG",
144 "message": "pairs object required"
145 }
146 })
147 .to_string(),
148 )
149 }
150 }
151 "LPUSH" => {
152 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
153 let len = cache.lpush(key, value);
154 (
155 200,
156 serde_json::json!({"ok": true, "result": len}).to_string(),
157 )
158 }
159 "RPUSH" => {
160 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
161 let len = cache.rpush(key, value);
162 (
163 200,
164 serde_json::json!({"ok": true, "result": len}).to_string(),
165 )
166 }
167 "LPOP" => {
168 let val = cache.lpop(key);
169 (
170 200,
171 serde_json::json!({"ok": true, "result": val}).to_string(),
172 )
173 }
174 "RPOP" => {
175 let val = cache.rpop(key);
176 (
177 200,
178 serde_json::json!({"ok": true, "result": val}).to_string(),
179 )
180 }
181 "LRANGE" => {
182 let start = data.get("start").and_then(|v| v.as_i64()).unwrap_or(0);
183 let stop = data.get("stop").and_then(|v| v.as_i64()).unwrap_or(-1);
184 let items = cache.lrange(key, start, stop);
185 (
186 200,
187 serde_json::json!({"ok": true, "result": items}).to_string(),
188 )
189 }
190 "LLEN" => {
191 let len = cache.llen(key);
192 (
193 200,
194 serde_json::json!({"ok": true, "result": len}).to_string(),
195 )
196 }
197 "SADD" => {
198 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
199 let added = cache.sadd(key, member);
200 (
201 200,
202 serde_json::json!({"ok": true, "result": added}).to_string(),
203 )
204 }
205 "SREM" => {
206 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
207 let removed = cache.srem(key, member);
208 (
209 200,
210 serde_json::json!({"ok": true, "result": removed}).to_string(),
211 )
212 }
213 "SMEMBERS" => {
214 let members = cache.smembers(key);
215 (
216 200,
217 serde_json::json!({"ok": true, "result": members}).to_string(),
218 )
219 }
220 "SISMEMBER" => {
221 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
222 let is_member = cache.sismember(key, member);
223 (
224 200,
225 serde_json::json!({"ok": true, "result": is_member}).to_string(),
226 )
227 }
228 "SCARD" => {
229 let count = cache.scard(key);
230 (
231 200,
232 serde_json::json!({"ok": true, "result": count}).to_string(),
233 )
234 }
235 "SINTER" => {
236 let key2 = data.get("key2").and_then(|v| v.as_str()).unwrap_or("");
237 let inter = cache.sinter(key, key2);
238 (
239 200,
240 serde_json::json!({"ok": true, "result": inter}).to_string(),
241 )
242 }
243 "SUNION" => {
244 let key2 = data.get("key2").and_then(|v| v.as_str()).unwrap_or("");
245 let union_result = cache.sunion(key, key2);
246 (
247 200,
248 serde_json::json!({"ok": true, "result": union_result}).to_string(),
249 )
250 }
251 "HSET" => {
252 let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
253 let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
254 cache.hset(key, field, value);
255 (200, serde_json::json!({"ok": true}).to_string())
256 }
257 "HGET" => {
258 let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
259 let val = cache.hget(key, field);
260 (
261 200,
262 serde_json::json!({"ok": true, "result": val}).to_string(),
263 )
264 }
265 "HDEL" => {
266 let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
267 let deleted = cache.hdel(key, field);
268 (
269 200,
270 serde_json::json!({"ok": true, "result": deleted}).to_string(),
271 )
272 }
273 "HGETALL" => {
274 let all = cache.hgetall(key);
275 (
276 200,
277 serde_json::json!({"ok": true, "result": all}).to_string(),
278 )
279 }
280 "HEXISTS" => {
281 let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
282 let exists = cache.hexists(key, field);
283 (
284 200,
285 serde_json::json!({"ok": true, "result": exists}).to_string(),
286 )
287 }
288 "HLEN" => {
289 let len = cache.hlen(key);
290 (
291 200,
292 serde_json::json!({"ok": true, "result": len}).to_string(),
293 )
294 }
295 "HKEYS" => {
296 let keys = cache.hkeys(key);
297 (
298 200,
299 serde_json::json!({"ok": true, "result": keys}).to_string(),
300 )
301 }
302 "HINCRBY" => {
303 let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
304 let amount = data.get("amount").and_then(|v| v.as_i64()).unwrap_or(1);
305 match cache.hincrby(key, field, amount) {
306 Ok(n) => (
307 200,
308 serde_json::json!({"ok": true, "result": n}).to_string(),
309 ),
310 Err(e) => (
311 400,
312 serde_json::json!({"ok": false, "error": e}).to_string(),
313 ),
314 }
315 }
316 "ZADD" => {
317 let score = data.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
318 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
319 cache.zadd(key, score, member);
320 (200, serde_json::json!({"ok": true}).to_string())
321 }
322 "ZREM" => {
323 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
324 let removed = cache.zrem(key, member);
325 (
326 200,
327 serde_json::json!({"ok": true, "result": removed}).to_string(),
328 )
329 }
330 "ZSCORE" => {
331 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
332 let score = cache.zscore(key, member);
333 (
334 200,
335 serde_json::json!({"ok": true, "result": score}).to_string(),
336 )
337 }
338 "ZRANK" => {
339 let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
340 let rank = cache.zrank(key, member);
341 (
342 200,
343 serde_json::json!({"ok": true, "result": rank}).to_string(),
344 )
345 }
346 "ZRANGE" => {
347 let start = data.get("start").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
348 let stop = data.get("stop").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
349 let members = cache.zrange(key, start, stop);
350 let result: Vec<serde_json::Value> = members
351 .iter()
352 .map(|(m, s)| serde_json::json!({"member": m, "score": s}))
353 .collect();
354 (
355 200,
356 serde_json::json!({"ok": true, "result": result}).to_string(),
357 )
358 }
359 "ZCARD" => {
360 let count = cache.zcard(key);
361 (
362 200,
363 serde_json::json!({"ok": true, "result": count}).to_string(),
364 )
365 }
366 "KEYS" => {
367 let pattern = data.get("pattern").and_then(|v| v.as_str()).unwrap_or("*");
368 let keys = cache.keys(pattern);
369 (
370 200,
371 serde_json::json!({"ok": true, "result": keys}).to_string(),
372 )
373 }
374 "TTL" => {
375 let ttl = cache.ttl(key);
376 (
377 200,
378 serde_json::json!({"ok": true, "result": ttl}).to_string(),
379 )
380 }
381 "EXPIRE" => {
382 let seconds = data.get("seconds").and_then(|v| v.as_u64()).unwrap_or(0);
383 let ok = cache.expire(key, seconds);
384 (
385 200,
386 serde_json::json!({"ok": true, "result": ok}).to_string(),
387 )
388 }
389 "PERSIST" => {
390 let ok = cache.persist(key);
391 (
392 200,
393 serde_json::json!({"ok": true, "result": ok}).to_string(),
394 )
395 }
396 "TYPE" => {
397 let t = cache.key_type(key);
398 (
399 200,
400 serde_json::json!({"ok": true, "result": t}).to_string(),
401 )
402 }
403 "DBSIZE" => {
404 let size = cache.dbsize();
405 (
406 200,
407 serde_json::json!({"ok": true, "result": size}).to_string(),
408 )
409 }
410 "FLUSHALL" => {
411 cache.flushall();
412 (200, serde_json::json!({"ok": true}).to_string())
413 }
414 "INFO" => {
415 let info = cache.info();
416 (
417 200,
418 serde_json::json!({"ok": true, "result": info}).to_string(),
419 )
420 }
421 "CLEANUP" => {
422 let removed = cache.cleanup_expired();
423 (
424 200,
425 serde_json::json!({"ok": true, "result": removed}).to_string(),
426 )
427 }
428 _ => (
429 400,
430 serde_json::json!({"ok": false, "error": format!("Unknown cache command: {cmd}")})
431 .to_string(),
432 ),
433 }
434}
435
436pub fn handle_cache_get(cache: &CachePlugin, key: &str) -> (u16, String) {
438 match cache.get(key) {
439 Some(v) => (
440 200,
441 serde_json::json!({"ok": true, "result": v}).to_string(),
442 ),
443 None => (
444 404,
445 serde_json::json!({
446 "error": {
447 "code": "NOT_FOUND",
448 "message": "key not found"
449 }
450 })
451 .to_string(),
452 ),
453 }
454}
455
456pub fn handle_cache_delete(cache: &CachePlugin, key: &str) -> (u16, String) {
458 let deleted = cache.del(key);
459 (
460 if deleted { 200 } else { 404 },
461 serde_json::json!({"ok": deleted}).to_string(),
462 )
463}
464
465pub fn handle_pubsub_publish(pubsub: &PubSubBroker, body: &str) -> (u16, String) {
471 let data: serde_json::Value = match serde_json::from_str(body) {
472 Ok(v) => v,
473 Err(e) => {
474 return (
475 400,
476 serde_json::json!({"ok": false, "error": format!("Invalid JSON: {e}")}).to_string(),
477 )
478 }
479 };
480
481 let channel = match data.get("channel").and_then(|v| v.as_str()) {
482 Some(ch) => ch,
483 None => {
484 return (
485 400,
486 serde_json::json!({"error": {"code": "MISSING_CHANNEL", "message": "channel is required"}}).to_string(),
487 )
488 }
489 };
490
491 let message = match data.get("message").and_then(|v| v.as_str()) {
492 Some(m) => m,
493 None => {
494 return (
495 400,
496 serde_json::json!({"error": {"code": "MISSING_MESSAGE", "message": "message is required"}}).to_string(),
497 )
498 }
499 };
500
501 let subscribers = pubsub.publish(channel, message);
502 (
503 200,
504 serde_json::json!({"ok": true, "subscribers": subscribers}).to_string(),
505 )
506}
507
508pub fn handle_pubsub_channels(pubsub: &PubSubBroker) -> (u16, String) {
510 let channels = pubsub.channels();
511 let result: Vec<serde_json::Value> = channels
512 .iter()
513 .map(|(ch, count)| serde_json::json!({"channel": ch, "subscribers": count}))
514 .collect();
515 (
516 200,
517 serde_json::json!({"ok": true, "result": result}).to_string(),
518 )
519}
520
521pub fn handle_pubsub_history(pubsub: &PubSubBroker, channel: &str, url: &str) -> (u16, String) {
525 let limit: usize = url
526 .split("limit=")
527 .nth(1)
528 .and_then(|s| s.split('&').next())
529 .and_then(|s| s.parse().ok())
530 .unwrap_or(50)
531 .min(1000);
532 let messages = pubsub.history(channel, limit);
533 (
534 200,
535 serde_json::json!({"ok": true, "result": messages}).to_string(),
536 )
537}
538
539#[cfg(test)]
544mod tests {
545 use super::*;
546
547 fn make_cache() -> CachePlugin {
548 CachePlugin::new(1000)
549 }
550
551 fn make_pubsub() -> PubSubBroker {
552 PubSubBroker::new(100)
553 }
554
555 #[test]
556 fn cache_set_and_get() {
557 let cache = make_cache();
558 let (status, _) = handle_cache_command(
559 &cache,
560 r#"{"cmd": "SET", "key": "hello", "value": "world"}"#,
561 );
562 assert_eq!(status, 200);
563
564 let (status, body) = handle_cache_command(&cache, r#"{"cmd": "GET", "key": "hello"}"#);
565 assert_eq!(status, 200);
566 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
567 assert_eq!(parsed["result"], "world");
568 }
569
570 #[test]
571 fn cache_get_shorthand() {
572 let cache = make_cache();
573 cache.set("mykey", "myval", None);
574 let (status, body) = handle_cache_get(&cache, "mykey");
575 assert_eq!(status, 200);
576 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
577 assert_eq!(parsed["result"], "myval");
578 }
579
580 #[test]
581 fn cache_get_shorthand_missing() {
582 let cache = make_cache();
583 let (status, _) = handle_cache_get(&cache, "nokey");
584 assert_eq!(status, 404);
585 }
586
587 #[test]
588 fn cache_delete_shorthand() {
589 let cache = make_cache();
590 cache.set("k", "v", None);
591 let (status, _) = handle_cache_delete(&cache, "k");
592 assert_eq!(status, 200);
593 let (status, _) = handle_cache_delete(&cache, "k");
594 assert_eq!(status, 404);
595 }
596
597 #[test]
598 fn cache_invalid_json() {
599 let cache = make_cache();
600 let (status, body) = handle_cache_command(&cache, "not json");
601 assert_eq!(status, 400);
602 assert!(body.contains("Invalid JSON"));
603 }
604
605 #[test]
606 fn cache_unknown_command() {
607 let cache = make_cache();
608 let (status, body) = handle_cache_command(&cache, r#"{"cmd": "NOTACMD", "key": "k"}"#);
609 assert_eq!(status, 400);
610 assert!(body.contains("Unknown cache command"));
611 }
612
613 #[test]
614 fn pubsub_publish_and_channels() {
615 let pubsub = make_pubsub();
616 let (status, body) =
617 handle_pubsub_publish(&pubsub, r#"{"channel": "chat", "message": "hello"}"#);
618 assert_eq!(status, 200);
619 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
620 assert_eq!(parsed["ok"], true);
621 }
622
623 #[test]
624 fn pubsub_publish_missing_channel() {
625 let pubsub = make_pubsub();
626 let (status, _) = handle_pubsub_publish(&pubsub, r#"{"message": "hello"}"#);
627 assert_eq!(status, 400);
628 }
629
630 #[test]
631 fn pubsub_publish_missing_message() {
632 let pubsub = make_pubsub();
633 let (status, _) = handle_pubsub_publish(&pubsub, r#"{"channel": "ch"}"#);
634 assert_eq!(status, 400);
635 }
636
637 #[test]
638 fn pubsub_history() {
639 let pubsub = make_pubsub();
640 pubsub.publish("news", "headline 1");
641 pubsub.publish("news", "headline 2");
642 let (status, body) =
643 handle_pubsub_history(&pubsub, "news", "/pubsub/history/news?limit=10");
644 assert_eq!(status, 200);
645 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
646 assert_eq!(parsed["result"].as_array().unwrap().len(), 2);
647 }
648
649 #[test]
650 fn pubsub_channels_list() {
651 let pubsub = make_pubsub();
652 let (status, body) = handle_pubsub_channels(&pubsub);
653 assert_eq!(status, 200);
654 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
655 assert_eq!(parsed["ok"], true);
656 }
657
658 #[test]
659 fn cache_incr_decr() {
660 let cache = make_cache();
661 let (status, body) = handle_cache_command(&cache, r#"{"cmd": "INCR", "key": "counter"}"#);
662 assert_eq!(status, 200);
663 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
664 assert_eq!(parsed["result"], 1);
665
666 let (_, body) = handle_cache_command(&cache, r#"{"cmd": "DECR", "key": "counter"}"#);
667 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
668 assert_eq!(parsed["result"], 0);
669 }
670
671 #[test]
672 fn cache_dbsize_and_flushall() {
673 let cache = make_cache();
674 cache.set("a", "1", None);
675 cache.set("b", "2", None);
676 let (_, body) = handle_cache_command(&cache, r#"{"cmd": "DBSIZE"}"#);
677 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
678 assert_eq!(parsed["result"], 2);
679
680 let (status, _) = handle_cache_command(&cache, r#"{"cmd": "FLUSHALL"}"#);
681 assert_eq!(status, 200);
682 assert_eq!(cache.dbsize(), 0);
683 }
684}