Skip to main content

pylon_runtime/
cache_handlers.rs

1//! Shared cache and pub/sub HTTP request handlers.
2//!
3//! These functions implement the cache command dispatch and pub/sub operations.
4//! They are used by both the main server (`/api/cache`, `/api/pubsub/*`) and
5//! the standalone cache server (`/cache`, `/pubsub/*`).
6
7use crate::pubsub::PubSubBroker;
8use pylon_plugin::builtin::cache::CachePlugin;
9
10// ---------------------------------------------------------------------------
11// Cache command dispatch
12// ---------------------------------------------------------------------------
13
14/// Handle a `POST /cache` (or `POST /api/cache`) request body.
15///
16/// Parses the JSON body, extracts the `cmd` field, and dispatches to the
17/// appropriate `CachePlugin` method. Returns `(http_status, json_body)`.
18pub fn handle_cache_command(cache: &CachePlugin, body: &str) -> (u16, String) {
19    let data: serde_json::Value = match serde_json::from_str(body) {
20        Ok(v) => v,
21        Err(e) => {
22            return (
23                400,
24                serde_json::json!({"ok": false, "error": format!("Invalid JSON: {e}")}).to_string(),
25            )
26        }
27    };
28
29    let cmd = data
30        .get("cmd")
31        .and_then(|v| v.as_str())
32        .unwrap_or("")
33        .to_uppercase();
34    let key = data.get("key").and_then(|v| v.as_str()).unwrap_or("");
35
36    match cmd.as_str() {
37        "SET" => {
38            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
39            let ttl = data.get("ttl").and_then(|v| v.as_u64());
40            cache.set(key, value, ttl);
41            (200, serde_json::json!({"ok": true}).to_string())
42        }
43        "GET" => match cache.get(key) {
44            Some(v) => (
45                200,
46                serde_json::json!({"ok": true, "result": v}).to_string(),
47            ),
48            None => (
49                200,
50                serde_json::json!({"ok": true, "result": null}).to_string(),
51            ),
52        },
53        "DEL" => {
54            let deleted = cache.del(key);
55            (
56                200,
57                serde_json::json!({"ok": true, "result": deleted}).to_string(),
58            )
59        }
60        "EXISTS" => {
61            let exists = cache.exists(key);
62            (
63                200,
64                serde_json::json!({"ok": true, "result": exists}).to_string(),
65            )
66        }
67        "INCR" => match cache.incr(key) {
68            Ok(n) => (
69                200,
70                serde_json::json!({"ok": true, "result": n}).to_string(),
71            ),
72            Err(e) => (
73                400,
74                serde_json::json!({"ok": false, "error": e}).to_string(),
75            ),
76        },
77        "DECR" => match cache.decr(key) {
78            Ok(n) => (
79                200,
80                serde_json::json!({"ok": true, "result": n}).to_string(),
81            ),
82            Err(e) => (
83                400,
84                serde_json::json!({"ok": false, "error": e}).to_string(),
85            ),
86        },
87        "INCRBY" => {
88            let amount = data.get("amount").and_then(|v| v.as_i64()).unwrap_or(1);
89            match cache.incrby(key, amount) {
90                Ok(n) => (
91                    200,
92                    serde_json::json!({"ok": true, "result": n}).to_string(),
93                ),
94                Err(e) => (
95                    400,
96                    serde_json::json!({"ok": false, "error": e}).to_string(),
97                ),
98            }
99        }
100        "SETNX" => {
101            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
102            let ttl = data.get("ttl").and_then(|v| v.as_u64());
103            let was_set = cache.setnx(key, value, ttl);
104            (
105                200,
106                serde_json::json!({"ok": true, "result": was_set}).to_string(),
107            )
108        }
109        "GETSET" => {
110            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
111            let old = cache.getset(key, value);
112            (
113                200,
114                serde_json::json!({"ok": true, "result": old}).to_string(),
115            )
116        }
117        "MGET" => {
118            let keys_arr: Vec<&str> = data
119                .get("keys")
120                .and_then(|v| v.as_array())
121                .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
122                .unwrap_or_default();
123            let results = cache.mget(&keys_arr);
124            (
125                200,
126                serde_json::json!({"ok": true, "result": results}).to_string(),
127            )
128        }
129        "MSET" => {
130            let pairs_val = data.get("pairs").and_then(|v| v.as_object());
131            if let Some(obj) = pairs_val {
132                let pairs: Vec<(&str, &str)> = obj
133                    .iter()
134                    .filter_map(|(k, v)| v.as_str().map(|s| (k.as_str(), s)))
135                    .collect();
136                cache.mset(&pairs);
137                (200, serde_json::json!({"ok": true}).to_string())
138            } else {
139                (
140                    400,
141                    serde_json::json!({
142                        "error": {
143                            "code": "INVALID_ARG",
144                            "message": "pairs object required"
145                        }
146                    })
147                    .to_string(),
148                )
149            }
150        }
151        "LPUSH" => {
152            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
153            let len = cache.lpush(key, value);
154            (
155                200,
156                serde_json::json!({"ok": true, "result": len}).to_string(),
157            )
158        }
159        "RPUSH" => {
160            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
161            let len = cache.rpush(key, value);
162            (
163                200,
164                serde_json::json!({"ok": true, "result": len}).to_string(),
165            )
166        }
167        "LPOP" => {
168            let val = cache.lpop(key);
169            (
170                200,
171                serde_json::json!({"ok": true, "result": val}).to_string(),
172            )
173        }
174        "RPOP" => {
175            let val = cache.rpop(key);
176            (
177                200,
178                serde_json::json!({"ok": true, "result": val}).to_string(),
179            )
180        }
181        "LRANGE" => {
182            let start = data.get("start").and_then(|v| v.as_i64()).unwrap_or(0);
183            let stop = data.get("stop").and_then(|v| v.as_i64()).unwrap_or(-1);
184            let items = cache.lrange(key, start, stop);
185            (
186                200,
187                serde_json::json!({"ok": true, "result": items}).to_string(),
188            )
189        }
190        "LLEN" => {
191            let len = cache.llen(key);
192            (
193                200,
194                serde_json::json!({"ok": true, "result": len}).to_string(),
195            )
196        }
197        "SADD" => {
198            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
199            let added = cache.sadd(key, member);
200            (
201                200,
202                serde_json::json!({"ok": true, "result": added}).to_string(),
203            )
204        }
205        "SREM" => {
206            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
207            let removed = cache.srem(key, member);
208            (
209                200,
210                serde_json::json!({"ok": true, "result": removed}).to_string(),
211            )
212        }
213        "SMEMBERS" => {
214            let members = cache.smembers(key);
215            (
216                200,
217                serde_json::json!({"ok": true, "result": members}).to_string(),
218            )
219        }
220        "SISMEMBER" => {
221            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
222            let is_member = cache.sismember(key, member);
223            (
224                200,
225                serde_json::json!({"ok": true, "result": is_member}).to_string(),
226            )
227        }
228        "SCARD" => {
229            let count = cache.scard(key);
230            (
231                200,
232                serde_json::json!({"ok": true, "result": count}).to_string(),
233            )
234        }
235        "SINTER" => {
236            let key2 = data.get("key2").and_then(|v| v.as_str()).unwrap_or("");
237            let inter = cache.sinter(key, key2);
238            (
239                200,
240                serde_json::json!({"ok": true, "result": inter}).to_string(),
241            )
242        }
243        "SUNION" => {
244            let key2 = data.get("key2").and_then(|v| v.as_str()).unwrap_or("");
245            let union_result = cache.sunion(key, key2);
246            (
247                200,
248                serde_json::json!({"ok": true, "result": union_result}).to_string(),
249            )
250        }
251        "HSET" => {
252            let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
253            let value = data.get("value").and_then(|v| v.as_str()).unwrap_or("");
254            cache.hset(key, field, value);
255            (200, serde_json::json!({"ok": true}).to_string())
256        }
257        "HGET" => {
258            let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
259            let val = cache.hget(key, field);
260            (
261                200,
262                serde_json::json!({"ok": true, "result": val}).to_string(),
263            )
264        }
265        "HDEL" => {
266            let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
267            let deleted = cache.hdel(key, field);
268            (
269                200,
270                serde_json::json!({"ok": true, "result": deleted}).to_string(),
271            )
272        }
273        "HGETALL" => {
274            let all = cache.hgetall(key);
275            (
276                200,
277                serde_json::json!({"ok": true, "result": all}).to_string(),
278            )
279        }
280        "HEXISTS" => {
281            let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
282            let exists = cache.hexists(key, field);
283            (
284                200,
285                serde_json::json!({"ok": true, "result": exists}).to_string(),
286            )
287        }
288        "HLEN" => {
289            let len = cache.hlen(key);
290            (
291                200,
292                serde_json::json!({"ok": true, "result": len}).to_string(),
293            )
294        }
295        "HKEYS" => {
296            let keys = cache.hkeys(key);
297            (
298                200,
299                serde_json::json!({"ok": true, "result": keys}).to_string(),
300            )
301        }
302        "HINCRBY" => {
303            let field = data.get("field").and_then(|v| v.as_str()).unwrap_or("");
304            let amount = data.get("amount").and_then(|v| v.as_i64()).unwrap_or(1);
305            match cache.hincrby(key, field, amount) {
306                Ok(n) => (
307                    200,
308                    serde_json::json!({"ok": true, "result": n}).to_string(),
309                ),
310                Err(e) => (
311                    400,
312                    serde_json::json!({"ok": false, "error": e}).to_string(),
313                ),
314            }
315        }
316        "ZADD" => {
317            let score = data.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
318            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
319            cache.zadd(key, score, member);
320            (200, serde_json::json!({"ok": true}).to_string())
321        }
322        "ZREM" => {
323            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
324            let removed = cache.zrem(key, member);
325            (
326                200,
327                serde_json::json!({"ok": true, "result": removed}).to_string(),
328            )
329        }
330        "ZSCORE" => {
331            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
332            let score = cache.zscore(key, member);
333            (
334                200,
335                serde_json::json!({"ok": true, "result": score}).to_string(),
336            )
337        }
338        "ZRANK" => {
339            let member = data.get("member").and_then(|v| v.as_str()).unwrap_or("");
340            let rank = cache.zrank(key, member);
341            (
342                200,
343                serde_json::json!({"ok": true, "result": rank}).to_string(),
344            )
345        }
346        "ZRANGE" => {
347            let start = data.get("start").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
348            let stop = data.get("stop").and_then(|v| v.as_u64()).unwrap_or(10) as usize;
349            let members = cache.zrange(key, start, stop);
350            let result: Vec<serde_json::Value> = members
351                .iter()
352                .map(|(m, s)| serde_json::json!({"member": m, "score": s}))
353                .collect();
354            (
355                200,
356                serde_json::json!({"ok": true, "result": result}).to_string(),
357            )
358        }
359        "ZCARD" => {
360            let count = cache.zcard(key);
361            (
362                200,
363                serde_json::json!({"ok": true, "result": count}).to_string(),
364            )
365        }
366        "KEYS" => {
367            let pattern = data.get("pattern").and_then(|v| v.as_str()).unwrap_or("*");
368            let keys = cache.keys(pattern);
369            (
370                200,
371                serde_json::json!({"ok": true, "result": keys}).to_string(),
372            )
373        }
374        "TTL" => {
375            let ttl = cache.ttl(key);
376            (
377                200,
378                serde_json::json!({"ok": true, "result": ttl}).to_string(),
379            )
380        }
381        "EXPIRE" => {
382            let seconds = data.get("seconds").and_then(|v| v.as_u64()).unwrap_or(0);
383            let ok = cache.expire(key, seconds);
384            (
385                200,
386                serde_json::json!({"ok": true, "result": ok}).to_string(),
387            )
388        }
389        "PERSIST" => {
390            let ok = cache.persist(key);
391            (
392                200,
393                serde_json::json!({"ok": true, "result": ok}).to_string(),
394            )
395        }
396        "TYPE" => {
397            let t = cache.key_type(key);
398            (
399                200,
400                serde_json::json!({"ok": true, "result": t}).to_string(),
401            )
402        }
403        "DBSIZE" => {
404            let size = cache.dbsize();
405            (
406                200,
407                serde_json::json!({"ok": true, "result": size}).to_string(),
408            )
409        }
410        "FLUSHALL" => {
411            cache.flushall();
412            (200, serde_json::json!({"ok": true}).to_string())
413        }
414        "INFO" => {
415            let info = cache.info();
416            (
417                200,
418                serde_json::json!({"ok": true, "result": info}).to_string(),
419            )
420        }
421        "CLEANUP" => {
422            let removed = cache.cleanup_expired();
423            (
424                200,
425                serde_json::json!({"ok": true, "result": removed}).to_string(),
426            )
427        }
428        _ => (
429            400,
430            serde_json::json!({"ok": false, "error": format!("Unknown cache command: {cmd}")})
431                .to_string(),
432        ),
433    }
434}
435
436/// Handle a `GET /cache/:key` shorthand request.
437pub fn handle_cache_get(cache: &CachePlugin, key: &str) -> (u16, String) {
438    match cache.get(key) {
439        Some(v) => (
440            200,
441            serde_json::json!({"ok": true, "result": v}).to_string(),
442        ),
443        None => (
444            404,
445            serde_json::json!({
446                "error": {
447                    "code": "NOT_FOUND",
448                    "message": "key not found"
449                }
450            })
451            .to_string(),
452        ),
453    }
454}
455
456/// Handle a `DELETE /cache/:key` shorthand request.
457pub fn handle_cache_delete(cache: &CachePlugin, key: &str) -> (u16, String) {
458    let deleted = cache.del(key);
459    (
460        if deleted { 200 } else { 404 },
461        serde_json::json!({"ok": deleted}).to_string(),
462    )
463}
464
465// ---------------------------------------------------------------------------
466// Pub/Sub handlers
467// ---------------------------------------------------------------------------
468
469/// Handle a `POST /pubsub/publish` request.
470pub fn handle_pubsub_publish(pubsub: &PubSubBroker, body: &str) -> (u16, String) {
471    let data: serde_json::Value = match serde_json::from_str(body) {
472        Ok(v) => v,
473        Err(e) => {
474            return (
475                400,
476                serde_json::json!({"ok": false, "error": format!("Invalid JSON: {e}")}).to_string(),
477            )
478        }
479    };
480
481    let channel = match data.get("channel").and_then(|v| v.as_str()) {
482        Some(ch) => ch,
483        None => {
484            return (
485                400,
486                serde_json::json!({"error": {"code": "MISSING_CHANNEL", "message": "channel is required"}}).to_string(),
487            )
488        }
489    };
490
491    let message = match data.get("message").and_then(|v| v.as_str()) {
492        Some(m) => m,
493        None => {
494            return (
495                400,
496                serde_json::json!({"error": {"code": "MISSING_MESSAGE", "message": "message is required"}}).to_string(),
497            )
498        }
499    };
500
501    let subscribers = pubsub.publish(channel, message);
502    (
503        200,
504        serde_json::json!({"ok": true, "subscribers": subscribers}).to_string(),
505    )
506}
507
508/// Handle a `GET /pubsub/channels` request.
509pub fn handle_pubsub_channels(pubsub: &PubSubBroker) -> (u16, String) {
510    let channels = pubsub.channels();
511    let result: Vec<serde_json::Value> = channels
512        .iter()
513        .map(|(ch, count)| serde_json::json!({"channel": ch, "subscribers": count}))
514        .collect();
515    (
516        200,
517        serde_json::json!({"ok": true, "result": result}).to_string(),
518    )
519}
520
521/// Handle a `GET /pubsub/history/:channel` request.
522///
523/// The `url` parameter is the full URL path (used to parse `?limit=N`).
524pub fn handle_pubsub_history(pubsub: &PubSubBroker, channel: &str, url: &str) -> (u16, String) {
525    let limit: usize = url
526        .split("limit=")
527        .nth(1)
528        .and_then(|s| s.split('&').next())
529        .and_then(|s| s.parse().ok())
530        .unwrap_or(50)
531        .min(1000);
532    let messages = pubsub.history(channel, limit);
533    (
534        200,
535        serde_json::json!({"ok": true, "result": messages}).to_string(),
536    )
537}
538
539// ---------------------------------------------------------------------------
540// Tests
541// ---------------------------------------------------------------------------
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546
547    fn make_cache() -> CachePlugin {
548        CachePlugin::new(1000)
549    }
550
551    fn make_pubsub() -> PubSubBroker {
552        PubSubBroker::new(100)
553    }
554
555    #[test]
556    fn cache_set_and_get() {
557        let cache = make_cache();
558        let (status, _) = handle_cache_command(
559            &cache,
560            r#"{"cmd": "SET", "key": "hello", "value": "world"}"#,
561        );
562        assert_eq!(status, 200);
563
564        let (status, body) = handle_cache_command(&cache, r#"{"cmd": "GET", "key": "hello"}"#);
565        assert_eq!(status, 200);
566        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
567        assert_eq!(parsed["result"], "world");
568    }
569
570    #[test]
571    fn cache_get_shorthand() {
572        let cache = make_cache();
573        cache.set("mykey", "myval", None);
574        let (status, body) = handle_cache_get(&cache, "mykey");
575        assert_eq!(status, 200);
576        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
577        assert_eq!(parsed["result"], "myval");
578    }
579
580    #[test]
581    fn cache_get_shorthand_missing() {
582        let cache = make_cache();
583        let (status, _) = handle_cache_get(&cache, "nokey");
584        assert_eq!(status, 404);
585    }
586
587    #[test]
588    fn cache_delete_shorthand() {
589        let cache = make_cache();
590        cache.set("k", "v", None);
591        let (status, _) = handle_cache_delete(&cache, "k");
592        assert_eq!(status, 200);
593        let (status, _) = handle_cache_delete(&cache, "k");
594        assert_eq!(status, 404);
595    }
596
597    #[test]
598    fn cache_invalid_json() {
599        let cache = make_cache();
600        let (status, body) = handle_cache_command(&cache, "not json");
601        assert_eq!(status, 400);
602        assert!(body.contains("Invalid JSON"));
603    }
604
605    #[test]
606    fn cache_unknown_command() {
607        let cache = make_cache();
608        let (status, body) = handle_cache_command(&cache, r#"{"cmd": "NOTACMD", "key": "k"}"#);
609        assert_eq!(status, 400);
610        assert!(body.contains("Unknown cache command"));
611    }
612
613    #[test]
614    fn pubsub_publish_and_channels() {
615        let pubsub = make_pubsub();
616        let (status, body) =
617            handle_pubsub_publish(&pubsub, r#"{"channel": "chat", "message": "hello"}"#);
618        assert_eq!(status, 200);
619        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
620        assert_eq!(parsed["ok"], true);
621    }
622
623    #[test]
624    fn pubsub_publish_missing_channel() {
625        let pubsub = make_pubsub();
626        let (status, _) = handle_pubsub_publish(&pubsub, r#"{"message": "hello"}"#);
627        assert_eq!(status, 400);
628    }
629
630    #[test]
631    fn pubsub_publish_missing_message() {
632        let pubsub = make_pubsub();
633        let (status, _) = handle_pubsub_publish(&pubsub, r#"{"channel": "ch"}"#);
634        assert_eq!(status, 400);
635    }
636
637    #[test]
638    fn pubsub_history() {
639        let pubsub = make_pubsub();
640        pubsub.publish("news", "headline 1");
641        pubsub.publish("news", "headline 2");
642        let (status, body) =
643            handle_pubsub_history(&pubsub, "news", "/pubsub/history/news?limit=10");
644        assert_eq!(status, 200);
645        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
646        assert_eq!(parsed["result"].as_array().unwrap().len(), 2);
647    }
648
649    #[test]
650    fn pubsub_channels_list() {
651        let pubsub = make_pubsub();
652        let (status, body) = handle_pubsub_channels(&pubsub);
653        assert_eq!(status, 200);
654        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
655        assert_eq!(parsed["ok"], true);
656    }
657
658    #[test]
659    fn cache_incr_decr() {
660        let cache = make_cache();
661        let (status, body) = handle_cache_command(&cache, r#"{"cmd": "INCR", "key": "counter"}"#);
662        assert_eq!(status, 200);
663        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
664        assert_eq!(parsed["result"], 1);
665
666        let (_, body) = handle_cache_command(&cache, r#"{"cmd": "DECR", "key": "counter"}"#);
667        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
668        assert_eq!(parsed["result"], 0);
669    }
670
671    #[test]
672    fn cache_dbsize_and_flushall() {
673        let cache = make_cache();
674        cache.set("a", "1", None);
675        cache.set("b", "2", None);
676        let (_, body) = handle_cache_command(&cache, r#"{"cmd": "DBSIZE"}"#);
677        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
678        assert_eq!(parsed["result"], 2);
679
680        let (status, _) = handle_cache_command(&cache, r#"{"cmd": "FLUSHALL"}"#);
681        assert_eq!(status, 200);
682        assert_eq!(cache.dbsize(), 0);
683    }
684}