Skip to main content

pylon_plugin/builtin/
cache_client.rs

1//! Remote cache client.
2//!
3//! Connects to a standalone pylon cache server over HTTP. Provides the
4//! same logical API as [`CachePlugin`](super::cache::CachePlugin) so callers
5//! can swap between embedded and remote cache without changing application
6//! logic.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! use pylon_plugin::builtin::cache_client::RemoteCacheClient;
12//!
13//! let client = RemoteCacheClient::new("http://localhost:6380");
14//! client.set("greeting", "hello", None).unwrap();
15//! assert_eq!(client.get("greeting").unwrap(), Some("hello".to_string()));
16//! ```
17
18use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::net::TcpStream;
21use std::time::Duration;
22
23/// A client that connects to a remote pylon cache server.
24///
25/// Uses raw HTTP/1.1 over TCP to avoid pulling in heavy HTTP client
26/// dependencies. Each request opens a new connection (`Connection: close`).
27/// This keeps the client simple and dependency-free.
28pub struct RemoteCacheClient {
29    /// Parsed host:port string (no scheme, no trailing path).
30    host_port: String,
31}
32
33impl RemoteCacheClient {
34    /// Create a new client pointing at the given base URL.
35    ///
36    /// The URL should be of the form `http://host:port`. Any trailing slash
37    /// or path component is stripped.
38    pub fn new(base_url: &str) -> Self {
39        let stripped = base_url.trim_end_matches('/');
40        let host_port = stripped
41            .strip_prefix("http://")
42            .unwrap_or(stripped)
43            .split('/')
44            .next()
45            .unwrap_or(stripped)
46            .to_string();
47        Self { host_port }
48    }
49
50    /// Send a POST request to `path` with a JSON body.
51    /// Returns the parsed JSON response body.
52    fn post(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value, String> {
53        let payload = body.to_string();
54        let request = format!(
55            "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
56            path, self.host_port, payload.len(), payload
57        );
58
59        let mut stream = TcpStream::connect(&self.host_port)
60            .map_err(|e| format!("Cache connection to {} failed: {e}", self.host_port))?;
61        stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
62        stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
63
64        stream
65            .write_all(request.as_bytes())
66            .map_err(|e| format!("Write failed: {e}"))?;
67
68        // Shut down the write half so the server knows we are done sending.
69        stream.shutdown(std::net::Shutdown::Write).ok();
70
71        let mut response = String::new();
72        stream
73            .read_to_string(&mut response)
74            .map_err(|e| format!("Read failed: {e}"))?;
75
76        // Parse HTTP response -- body comes after the first blank line.
77        let body_str = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
78        serde_json::from_str(body_str).map_err(|e| format!("Parse failed: {e}"))
79    }
80
81    /// Send a GET request to `path`.
82    fn get(&self, path: &str) -> Result<serde_json::Value, String> {
83        let request = format!(
84            "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
85            path, self.host_port
86        );
87
88        let mut stream = TcpStream::connect(&self.host_port)
89            .map_err(|e| format!("Cache connection to {} failed: {e}", self.host_port))?;
90        stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
91
92        stream
93            .write_all(request.as_bytes())
94            .map_err(|e| format!("Write failed: {e}"))?;
95        stream.shutdown(std::net::Shutdown::Write).ok();
96
97        let mut response = String::new();
98        stream
99            .read_to_string(&mut response)
100            .map_err(|e| format!("Read failed: {e}"))?;
101
102        let body_str = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
103        serde_json::from_str(body_str).map_err(|e| format!("Parse failed: {e}"))
104    }
105
106    /// Execute a cache command via `POST /cache`.
107    fn execute(&self, cmd: serde_json::Value) -> Result<serde_json::Value, String> {
108        self.post("/cache", &cmd)
109    }
110
111    // -----------------------------------------------------------------------
112    // String operations
113    // -----------------------------------------------------------------------
114
115    /// SET key value [EX seconds]
116    pub fn set(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<(), String> {
117        let mut cmd = serde_json::json!({"cmd": "SET", "key": key, "value": value});
118        if let Some(t) = ttl {
119            cmd["ttl"] = serde_json::json!(t);
120        }
121        self.execute(cmd)?;
122        Ok(())
123    }
124
125    /// GET key
126    pub fn get_key(&self, key: &str) -> Result<Option<String>, String> {
127        let result = self.execute(serde_json::json!({"cmd": "GET", "key": key}))?;
128        Ok(result.get("result").and_then(|v| {
129            if v.is_null() {
130                None
131            } else {
132                v.as_str().map(|s| s.to_string())
133            }
134        }))
135    }
136
137    /// DEL key
138    pub fn del(&self, key: &str) -> Result<bool, String> {
139        let result = self.execute(serde_json::json!({"cmd": "DEL", "key": key}))?;
140        Ok(result
141            .get("result")
142            .and_then(|v| v.as_bool())
143            .unwrap_or(false))
144    }
145
146    /// EXISTS key
147    pub fn exists(&self, key: &str) -> Result<bool, String> {
148        let result = self.execute(serde_json::json!({"cmd": "EXISTS", "key": key}))?;
149        Ok(result
150            .get("result")
151            .and_then(|v| v.as_bool())
152            .unwrap_or(false))
153    }
154
155    /// INCR key
156    pub fn incr(&self, key: &str) -> Result<i64, String> {
157        let result = self.execute(serde_json::json!({"cmd": "INCR", "key": key}))?;
158        result
159            .get("result")
160            .and_then(|v| v.as_i64())
161            .ok_or_else(|| {
162                result
163                    .get("error")
164                    .and_then(|v| v.as_str())
165                    .unwrap_or("INCR failed")
166                    .to_string()
167            })
168    }
169
170    /// DECR key
171    pub fn decr(&self, key: &str) -> Result<i64, String> {
172        let result = self.execute(serde_json::json!({"cmd": "DECR", "key": key}))?;
173        result
174            .get("result")
175            .and_then(|v| v.as_i64())
176            .ok_or_else(|| {
177                result
178                    .get("error")
179                    .and_then(|v| v.as_str())
180                    .unwrap_or("DECR failed")
181                    .to_string()
182            })
183    }
184
185    /// INCRBY key amount
186    pub fn incrby(&self, key: &str, amount: i64) -> Result<i64, String> {
187        let result =
188            self.execute(serde_json::json!({"cmd": "INCRBY", "key": key, "amount": amount}))?;
189        result
190            .get("result")
191            .and_then(|v| v.as_i64())
192            .ok_or_else(|| {
193                result
194                    .get("error")
195                    .and_then(|v| v.as_str())
196                    .unwrap_or("INCRBY failed")
197                    .to_string()
198            })
199    }
200
201    /// SETNX key value [EX seconds]
202    pub fn setnx(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<bool, String> {
203        let mut cmd = serde_json::json!({"cmd": "SETNX", "key": key, "value": value});
204        if let Some(t) = ttl {
205            cmd["ttl"] = serde_json::json!(t);
206        }
207        let result = self.execute(cmd)?;
208        Ok(result
209            .get("result")
210            .and_then(|v| v.as_bool())
211            .unwrap_or(false))
212    }
213
214    /// GETSET key value
215    pub fn getset(&self, key: &str, value: &str) -> Result<Option<String>, String> {
216        let result =
217            self.execute(serde_json::json!({"cmd": "GETSET", "key": key, "value": value}))?;
218        Ok(result.get("result").and_then(|v| {
219            if v.is_null() {
220                None
221            } else {
222                v.as_str().map(|s| s.to_string())
223            }
224        }))
225    }
226
227    // -----------------------------------------------------------------------
228    // List operations
229    // -----------------------------------------------------------------------
230
231    /// LPUSH key value
232    pub fn lpush(&self, key: &str, value: &str) -> Result<usize, String> {
233        let result =
234            self.execute(serde_json::json!({"cmd": "LPUSH", "key": key, "value": value}))?;
235        result
236            .get("result")
237            .and_then(|v| v.as_u64())
238            .map(|n| n as usize)
239            .ok_or_else(|| "LPUSH failed".into())
240    }
241
242    /// RPUSH key value
243    pub fn rpush(&self, key: &str, value: &str) -> Result<usize, String> {
244        let result =
245            self.execute(serde_json::json!({"cmd": "RPUSH", "key": key, "value": value}))?;
246        result
247            .get("result")
248            .and_then(|v| v.as_u64())
249            .map(|n| n as usize)
250            .ok_or_else(|| "RPUSH failed".into())
251    }
252
253    /// LPOP key
254    pub fn lpop(&self, key: &str) -> Result<Option<String>, String> {
255        let result = self.execute(serde_json::json!({"cmd": "LPOP", "key": key}))?;
256        Ok(result.get("result").and_then(|v| {
257            if v.is_null() {
258                None
259            } else {
260                v.as_str().map(|s| s.to_string())
261            }
262        }))
263    }
264
265    /// RPOP key
266    pub fn rpop(&self, key: &str) -> Result<Option<String>, String> {
267        let result = self.execute(serde_json::json!({"cmd": "RPOP", "key": key}))?;
268        Ok(result.get("result").and_then(|v| {
269            if v.is_null() {
270                None
271            } else {
272                v.as_str().map(|s| s.to_string())
273            }
274        }))
275    }
276
277    /// LRANGE key start stop
278    pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Result<Vec<String>, String> {
279        let result = self.execute(
280            serde_json::json!({"cmd": "LRANGE", "key": key, "start": start, "stop": stop}),
281        )?;
282        Ok(result
283            .get("result")
284            .and_then(|v| v.as_array())
285            .map(|arr| {
286                arr.iter()
287                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
288                    .collect()
289            })
290            .unwrap_or_default())
291    }
292
293    /// LLEN key
294    pub fn llen(&self, key: &str) -> Result<usize, String> {
295        let result = self.execute(serde_json::json!({"cmd": "LLEN", "key": key}))?;
296        result
297            .get("result")
298            .and_then(|v| v.as_u64())
299            .map(|n| n as usize)
300            .ok_or_else(|| "LLEN failed".into())
301    }
302
303    // -----------------------------------------------------------------------
304    // Set operations
305    // -----------------------------------------------------------------------
306
307    /// SADD key member
308    pub fn sadd(&self, key: &str, member: &str) -> Result<bool, String> {
309        let result =
310            self.execute(serde_json::json!({"cmd": "SADD", "key": key, "member": member}))?;
311        Ok(result
312            .get("result")
313            .and_then(|v| v.as_bool())
314            .unwrap_or(false))
315    }
316
317    /// SREM key member
318    pub fn srem(&self, key: &str, member: &str) -> Result<bool, String> {
319        let result =
320            self.execute(serde_json::json!({"cmd": "SREM", "key": key, "member": member}))?;
321        Ok(result
322            .get("result")
323            .and_then(|v| v.as_bool())
324            .unwrap_or(false))
325    }
326
327    /// SMEMBERS key
328    pub fn smembers(&self, key: &str) -> Result<Vec<String>, String> {
329        let result = self.execute(serde_json::json!({"cmd": "SMEMBERS", "key": key}))?;
330        Ok(result
331            .get("result")
332            .and_then(|v| v.as_array())
333            .map(|arr| {
334                arr.iter()
335                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
336                    .collect()
337            })
338            .unwrap_or_default())
339    }
340
341    /// SISMEMBER key member
342    pub fn sismember(&self, key: &str, member: &str) -> Result<bool, String> {
343        let result =
344            self.execute(serde_json::json!({"cmd": "SISMEMBER", "key": key, "member": member}))?;
345        Ok(result
346            .get("result")
347            .and_then(|v| v.as_bool())
348            .unwrap_or(false))
349    }
350
351    /// SCARD key
352    pub fn scard(&self, key: &str) -> Result<usize, String> {
353        let result = self.execute(serde_json::json!({"cmd": "SCARD", "key": key}))?;
354        result
355            .get("result")
356            .and_then(|v| v.as_u64())
357            .map(|n| n as usize)
358            .ok_or_else(|| "SCARD failed".into())
359    }
360
361    // -----------------------------------------------------------------------
362    // Hash operations
363    // -----------------------------------------------------------------------
364
365    /// HSET key field value
366    pub fn hset(&self, key: &str, field: &str, value: &str) -> Result<(), String> {
367        self.execute(
368            serde_json::json!({"cmd": "HSET", "key": key, "field": field, "value": value}),
369        )?;
370        Ok(())
371    }
372
373    /// HGET key field
374    pub fn hget(&self, key: &str, field: &str) -> Result<Option<String>, String> {
375        let result =
376            self.execute(serde_json::json!({"cmd": "HGET", "key": key, "field": field}))?;
377        Ok(result.get("result").and_then(|v| {
378            if v.is_null() {
379                None
380            } else {
381                v.as_str().map(|s| s.to_string())
382            }
383        }))
384    }
385
386    /// HDEL key field
387    pub fn hdel(&self, key: &str, field: &str) -> Result<bool, String> {
388        let result =
389            self.execute(serde_json::json!({"cmd": "HDEL", "key": key, "field": field}))?;
390        Ok(result
391            .get("result")
392            .and_then(|v| v.as_bool())
393            .unwrap_or(false))
394    }
395
396    /// HGETALL key
397    pub fn hgetall(&self, key: &str) -> Result<HashMap<String, String>, String> {
398        let result = self.execute(serde_json::json!({"cmd": "HGETALL", "key": key}))?;
399        let mut map = HashMap::new();
400        if let Some(obj) = result.get("result").and_then(|v| v.as_object()) {
401            for (k, v) in obj {
402                if let Some(s) = v.as_str() {
403                    map.insert(k.clone(), s.to_string());
404                }
405            }
406        }
407        Ok(map)
408    }
409
410    /// HEXISTS key field
411    pub fn hexists(&self, key: &str, field: &str) -> Result<bool, String> {
412        let result =
413            self.execute(serde_json::json!({"cmd": "HEXISTS", "key": key, "field": field}))?;
414        Ok(result
415            .get("result")
416            .and_then(|v| v.as_bool())
417            .unwrap_or(false))
418    }
419
420    /// HLEN key
421    pub fn hlen(&self, key: &str) -> Result<usize, String> {
422        let result = self.execute(serde_json::json!({"cmd": "HLEN", "key": key}))?;
423        result
424            .get("result")
425            .and_then(|v| v.as_u64())
426            .map(|n| n as usize)
427            .ok_or_else(|| "HLEN failed".into())
428    }
429
430    /// HKEYS key
431    pub fn hkeys(&self, key: &str) -> Result<Vec<String>, String> {
432        let result = self.execute(serde_json::json!({"cmd": "HKEYS", "key": key}))?;
433        Ok(result
434            .get("result")
435            .and_then(|v| v.as_array())
436            .map(|arr| {
437                arr.iter()
438                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
439                    .collect()
440            })
441            .unwrap_or_default())
442    }
443
444    /// HINCRBY key field amount
445    pub fn hincrby(&self, key: &str, field: &str, amount: i64) -> Result<i64, String> {
446        let result = self.execute(
447            serde_json::json!({"cmd": "HINCRBY", "key": key, "field": field, "amount": amount}),
448        )?;
449        result
450            .get("result")
451            .and_then(|v| v.as_i64())
452            .ok_or_else(|| {
453                result
454                    .get("error")
455                    .and_then(|v| v.as_str())
456                    .unwrap_or("HINCRBY failed")
457                    .to_string()
458            })
459    }
460
461    // -----------------------------------------------------------------------
462    // Sorted set operations
463    // -----------------------------------------------------------------------
464
465    /// ZADD key score member
466    pub fn zadd(&self, key: &str, score: f64, member: &str) -> Result<(), String> {
467        self.execute(
468            serde_json::json!({"cmd": "ZADD", "key": key, "score": score, "member": member}),
469        )?;
470        Ok(())
471    }
472
473    /// ZREM key member
474    pub fn zrem(&self, key: &str, member: &str) -> Result<bool, String> {
475        let result =
476            self.execute(serde_json::json!({"cmd": "ZREM", "key": key, "member": member}))?;
477        Ok(result
478            .get("result")
479            .and_then(|v| v.as_bool())
480            .unwrap_or(false))
481    }
482
483    /// ZSCORE key member
484    pub fn zscore(&self, key: &str, member: &str) -> Result<Option<f64>, String> {
485        let result =
486            self.execute(serde_json::json!({"cmd": "ZSCORE", "key": key, "member": member}))?;
487        Ok(result
488            .get("result")
489            .and_then(|v| if v.is_null() { None } else { v.as_f64() }))
490    }
491
492    /// ZRANK key member
493    pub fn zrank(&self, key: &str, member: &str) -> Result<Option<usize>, String> {
494        let result =
495            self.execute(serde_json::json!({"cmd": "ZRANK", "key": key, "member": member}))?;
496        Ok(result.get("result").and_then(|v| {
497            if v.is_null() {
498                None
499            } else {
500                v.as_u64().map(|n| n as usize)
501            }
502        }))
503    }
504
505    /// ZRANGE key start stop
506    pub fn zrange(
507        &self,
508        key: &str,
509        start: usize,
510        stop: usize,
511    ) -> Result<Vec<(String, f64)>, String> {
512        let result = self.execute(
513            serde_json::json!({"cmd": "ZRANGE", "key": key, "start": start, "stop": stop}),
514        )?;
515        let mut entries = Vec::new();
516        if let Some(arr) = result.get("result").and_then(|v| v.as_array()) {
517            for item in arr {
518                if let Some(obj) = item.as_object() {
519                    let member = obj
520                        .get("member")
521                        .and_then(|v| v.as_str())
522                        .unwrap_or("")
523                        .to_string();
524                    let score = obj.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
525                    entries.push((member, score));
526                }
527            }
528        }
529        Ok(entries)
530    }
531
532    /// ZCARD key
533    pub fn zcard(&self, key: &str) -> Result<usize, String> {
534        let result = self.execute(serde_json::json!({"cmd": "ZCARD", "key": key}))?;
535        result
536            .get("result")
537            .and_then(|v| v.as_u64())
538            .map(|n| n as usize)
539            .ok_or_else(|| "ZCARD failed".into())
540    }
541
542    // -----------------------------------------------------------------------
543    // Utility operations
544    // -----------------------------------------------------------------------
545
546    /// KEYS pattern
547    pub fn keys(&self, pattern: &str) -> Result<Vec<String>, String> {
548        let result = self.execute(serde_json::json!({"cmd": "KEYS", "pattern": pattern}))?;
549        Ok(result
550            .get("result")
551            .and_then(|v| v.as_array())
552            .map(|arr| {
553                arr.iter()
554                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
555                    .collect()
556            })
557            .unwrap_or_default())
558    }
559
560    /// TTL key
561    pub fn ttl(&self, key: &str) -> Result<Option<u64>, String> {
562        let result = self.execute(serde_json::json!({"cmd": "TTL", "key": key}))?;
563        Ok(result
564            .get("result")
565            .and_then(|v| if v.is_null() { None } else { v.as_u64() }))
566    }
567
568    /// EXPIRE key seconds
569    pub fn expire(&self, key: &str, seconds: u64) -> Result<bool, String> {
570        let result =
571            self.execute(serde_json::json!({"cmd": "EXPIRE", "key": key, "seconds": seconds}))?;
572        Ok(result
573            .get("result")
574            .and_then(|v| v.as_bool())
575            .unwrap_or(false))
576    }
577
578    /// PERSIST key
579    pub fn persist(&self, key: &str) -> Result<bool, String> {
580        let result = self.execute(serde_json::json!({"cmd": "PERSIST", "key": key}))?;
581        Ok(result
582            .get("result")
583            .and_then(|v| v.as_bool())
584            .unwrap_or(false))
585    }
586
587    /// TYPE key
588    pub fn key_type(&self, key: &str) -> Result<String, String> {
589        let result = self.execute(serde_json::json!({"cmd": "TYPE", "key": key}))?;
590        Ok(result
591            .get("result")
592            .and_then(|v| v.as_str())
593            .unwrap_or("none")
594            .to_string())
595    }
596
597    /// DBSIZE
598    pub fn dbsize(&self) -> Result<usize, String> {
599        let result = self.execute(serde_json::json!({"cmd": "DBSIZE"}))?;
600        result
601            .get("result")
602            .and_then(|v| v.as_u64())
603            .map(|n| n as usize)
604            .ok_or_else(|| "DBSIZE failed".into())
605    }
606
607    /// FLUSHALL
608    pub fn flushall(&self) -> Result<(), String> {
609        self.execute(serde_json::json!({"cmd": "FLUSHALL"}))?;
610        Ok(())
611    }
612
613    /// INFO
614    pub fn info(&self) -> Result<serde_json::Value, String> {
615        let result = self.execute(serde_json::json!({"cmd": "INFO"}))?;
616        Ok(result
617            .get("result")
618            .cloned()
619            .unwrap_or(serde_json::json!({})))
620    }
621
622    // -----------------------------------------------------------------------
623    // Pub/Sub
624    // -----------------------------------------------------------------------
625
626    /// Publish a message to a channel on the remote cache server.
627    /// Returns the number of subscribers notified.
628    pub fn publish(&self, channel: &str, message: &str) -> Result<usize, String> {
629        let result = self.post(
630            "/pubsub/publish",
631            &serde_json::json!({"channel": channel, "message": message}),
632        )?;
633        Ok(result
634            .get("subscribers")
635            .and_then(|v| v.as_u64())
636            .unwrap_or(0) as usize)
637    }
638
639    /// List channels with subscriber counts.
640    pub fn channels(&self) -> Result<Vec<(String, usize)>, String> {
641        let result = self.get("/pubsub/channels")?;
642        let mut channels = Vec::new();
643        if let Some(arr) = result.get("result").and_then(|v| v.as_array()) {
644            for item in arr {
645                let ch = item
646                    .get("channel")
647                    .and_then(|v| v.as_str())
648                    .unwrap_or("")
649                    .to_string();
650                let count = item
651                    .get("subscribers")
652                    .and_then(|v| v.as_u64())
653                    .unwrap_or(0) as usize;
654                channels.push((ch, count));
655            }
656        }
657        Ok(channels)
658    }
659
660    /// Get message history for a channel.
661    pub fn history(&self, channel: &str, limit: usize) -> Result<Vec<serde_json::Value>, String> {
662        let path = format!("/pubsub/history/{channel}?limit={limit}");
663        let result = self.get(&path)?;
664        Ok(result
665            .get("result")
666            .and_then(|v| v.as_array())
667            .cloned()
668            .unwrap_or_default())
669    }
670
671    /// Health check -- returns the server's health response.
672    pub fn health(&self) -> Result<serde_json::Value, String> {
673        self.get("/health")
674    }
675}
676
677// ---------------------------------------------------------------------------
678// Tests
679// ---------------------------------------------------------------------------
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684
685    #[test]
686    fn parse_base_url() {
687        let c = RemoteCacheClient::new("http://localhost:6380");
688        assert_eq!(c.host_port, "localhost:6380");
689    }
690
691    #[test]
692    fn parse_base_url_trailing_slash() {
693        let c = RemoteCacheClient::new("http://localhost:6380/");
694        assert_eq!(c.host_port, "localhost:6380");
695    }
696
697    #[test]
698    fn parse_base_url_no_scheme() {
699        let c = RemoteCacheClient::new("cache.internal:6380");
700        assert_eq!(c.host_port, "cache.internal:6380");
701    }
702
703    #[test]
704    fn parse_base_url_with_path() {
705        let c = RemoteCacheClient::new("http://localhost:6380/some/path");
706        assert_eq!(c.host_port, "localhost:6380");
707    }
708}