1use std::collections::HashMap;
19use std::io::{Read, Write};
20use std::net::TcpStream;
21use std::time::Duration;
22
23pub struct RemoteCacheClient {
29 host_port: String,
31}
32
33impl RemoteCacheClient {
34 pub fn new(base_url: &str) -> Self {
39 let stripped = base_url.trim_end_matches('/');
40 let host_port = stripped
41 .strip_prefix("http://")
42 .unwrap_or(stripped)
43 .split('/')
44 .next()
45 .unwrap_or(stripped)
46 .to_string();
47 Self { host_port }
48 }
49
50 fn post(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value, String> {
53 let payload = body.to_string();
54 let request = format!(
55 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
56 path, self.host_port, payload.len(), payload
57 );
58
59 let mut stream = TcpStream::connect(&self.host_port)
60 .map_err(|e| format!("Cache connection to {} failed: {e}", self.host_port))?;
61 stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
62 stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
63
64 stream
65 .write_all(request.as_bytes())
66 .map_err(|e| format!("Write failed: {e}"))?;
67
68 stream.shutdown(std::net::Shutdown::Write).ok();
70
71 let mut response = String::new();
72 stream
73 .read_to_string(&mut response)
74 .map_err(|e| format!("Read failed: {e}"))?;
75
76 let body_str = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
78 serde_json::from_str(body_str).map_err(|e| format!("Parse failed: {e}"))
79 }
80
81 fn get(&self, path: &str) -> Result<serde_json::Value, String> {
83 let request = format!(
84 "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
85 path, self.host_port
86 );
87
88 let mut stream = TcpStream::connect(&self.host_port)
89 .map_err(|e| format!("Cache connection to {} failed: {e}", self.host_port))?;
90 stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
91
92 stream
93 .write_all(request.as_bytes())
94 .map_err(|e| format!("Write failed: {e}"))?;
95 stream.shutdown(std::net::Shutdown::Write).ok();
96
97 let mut response = String::new();
98 stream
99 .read_to_string(&mut response)
100 .map_err(|e| format!("Read failed: {e}"))?;
101
102 let body_str = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
103 serde_json::from_str(body_str).map_err(|e| format!("Parse failed: {e}"))
104 }
105
106 fn execute(&self, cmd: serde_json::Value) -> Result<serde_json::Value, String> {
108 self.post("/cache", &cmd)
109 }
110
111 pub fn set(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<(), String> {
117 let mut cmd = serde_json::json!({"cmd": "SET", "key": key, "value": value});
118 if let Some(t) = ttl {
119 cmd["ttl"] = serde_json::json!(t);
120 }
121 self.execute(cmd)?;
122 Ok(())
123 }
124
125 pub fn get_key(&self, key: &str) -> Result<Option<String>, String> {
127 let result = self.execute(serde_json::json!({"cmd": "GET", "key": key}))?;
128 Ok(result.get("result").and_then(|v| {
129 if v.is_null() {
130 None
131 } else {
132 v.as_str().map(|s| s.to_string())
133 }
134 }))
135 }
136
137 pub fn del(&self, key: &str) -> Result<bool, String> {
139 let result = self.execute(serde_json::json!({"cmd": "DEL", "key": key}))?;
140 Ok(result
141 .get("result")
142 .and_then(|v| v.as_bool())
143 .unwrap_or(false))
144 }
145
146 pub fn exists(&self, key: &str) -> Result<bool, String> {
148 let result = self.execute(serde_json::json!({"cmd": "EXISTS", "key": key}))?;
149 Ok(result
150 .get("result")
151 .and_then(|v| v.as_bool())
152 .unwrap_or(false))
153 }
154
155 pub fn incr(&self, key: &str) -> Result<i64, String> {
157 let result = self.execute(serde_json::json!({"cmd": "INCR", "key": key}))?;
158 result
159 .get("result")
160 .and_then(|v| v.as_i64())
161 .ok_or_else(|| {
162 result
163 .get("error")
164 .and_then(|v| v.as_str())
165 .unwrap_or("INCR failed")
166 .to_string()
167 })
168 }
169
170 pub fn decr(&self, key: &str) -> Result<i64, String> {
172 let result = self.execute(serde_json::json!({"cmd": "DECR", "key": key}))?;
173 result
174 .get("result")
175 .and_then(|v| v.as_i64())
176 .ok_or_else(|| {
177 result
178 .get("error")
179 .and_then(|v| v.as_str())
180 .unwrap_or("DECR failed")
181 .to_string()
182 })
183 }
184
185 pub fn incrby(&self, key: &str, amount: i64) -> Result<i64, String> {
187 let result =
188 self.execute(serde_json::json!({"cmd": "INCRBY", "key": key, "amount": amount}))?;
189 result
190 .get("result")
191 .and_then(|v| v.as_i64())
192 .ok_or_else(|| {
193 result
194 .get("error")
195 .and_then(|v| v.as_str())
196 .unwrap_or("INCRBY failed")
197 .to_string()
198 })
199 }
200
201 pub fn setnx(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<bool, String> {
203 let mut cmd = serde_json::json!({"cmd": "SETNX", "key": key, "value": value});
204 if let Some(t) = ttl {
205 cmd["ttl"] = serde_json::json!(t);
206 }
207 let result = self.execute(cmd)?;
208 Ok(result
209 .get("result")
210 .and_then(|v| v.as_bool())
211 .unwrap_or(false))
212 }
213
214 pub fn getset(&self, key: &str, value: &str) -> Result<Option<String>, String> {
216 let result =
217 self.execute(serde_json::json!({"cmd": "GETSET", "key": key, "value": value}))?;
218 Ok(result.get("result").and_then(|v| {
219 if v.is_null() {
220 None
221 } else {
222 v.as_str().map(|s| s.to_string())
223 }
224 }))
225 }
226
227 pub fn lpush(&self, key: &str, value: &str) -> Result<usize, String> {
233 let result =
234 self.execute(serde_json::json!({"cmd": "LPUSH", "key": key, "value": value}))?;
235 result
236 .get("result")
237 .and_then(|v| v.as_u64())
238 .map(|n| n as usize)
239 .ok_or_else(|| "LPUSH failed".into())
240 }
241
242 pub fn rpush(&self, key: &str, value: &str) -> Result<usize, String> {
244 let result =
245 self.execute(serde_json::json!({"cmd": "RPUSH", "key": key, "value": value}))?;
246 result
247 .get("result")
248 .and_then(|v| v.as_u64())
249 .map(|n| n as usize)
250 .ok_or_else(|| "RPUSH failed".into())
251 }
252
253 pub fn lpop(&self, key: &str) -> Result<Option<String>, String> {
255 let result = self.execute(serde_json::json!({"cmd": "LPOP", "key": key}))?;
256 Ok(result.get("result").and_then(|v| {
257 if v.is_null() {
258 None
259 } else {
260 v.as_str().map(|s| s.to_string())
261 }
262 }))
263 }
264
265 pub fn rpop(&self, key: &str) -> Result<Option<String>, String> {
267 let result = self.execute(serde_json::json!({"cmd": "RPOP", "key": key}))?;
268 Ok(result.get("result").and_then(|v| {
269 if v.is_null() {
270 None
271 } else {
272 v.as_str().map(|s| s.to_string())
273 }
274 }))
275 }
276
277 pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Result<Vec<String>, String> {
279 let result = self.execute(
280 serde_json::json!({"cmd": "LRANGE", "key": key, "start": start, "stop": stop}),
281 )?;
282 Ok(result
283 .get("result")
284 .and_then(|v| v.as_array())
285 .map(|arr| {
286 arr.iter()
287 .filter_map(|v| v.as_str().map(|s| s.to_string()))
288 .collect()
289 })
290 .unwrap_or_default())
291 }
292
293 pub fn llen(&self, key: &str) -> Result<usize, String> {
295 let result = self.execute(serde_json::json!({"cmd": "LLEN", "key": key}))?;
296 result
297 .get("result")
298 .and_then(|v| v.as_u64())
299 .map(|n| n as usize)
300 .ok_or_else(|| "LLEN failed".into())
301 }
302
303 pub fn sadd(&self, key: &str, member: &str) -> Result<bool, String> {
309 let result =
310 self.execute(serde_json::json!({"cmd": "SADD", "key": key, "member": member}))?;
311 Ok(result
312 .get("result")
313 .and_then(|v| v.as_bool())
314 .unwrap_or(false))
315 }
316
317 pub fn srem(&self, key: &str, member: &str) -> Result<bool, String> {
319 let result =
320 self.execute(serde_json::json!({"cmd": "SREM", "key": key, "member": member}))?;
321 Ok(result
322 .get("result")
323 .and_then(|v| v.as_bool())
324 .unwrap_or(false))
325 }
326
327 pub fn smembers(&self, key: &str) -> Result<Vec<String>, String> {
329 let result = self.execute(serde_json::json!({"cmd": "SMEMBERS", "key": key}))?;
330 Ok(result
331 .get("result")
332 .and_then(|v| v.as_array())
333 .map(|arr| {
334 arr.iter()
335 .filter_map(|v| v.as_str().map(|s| s.to_string()))
336 .collect()
337 })
338 .unwrap_or_default())
339 }
340
341 pub fn sismember(&self, key: &str, member: &str) -> Result<bool, String> {
343 let result =
344 self.execute(serde_json::json!({"cmd": "SISMEMBER", "key": key, "member": member}))?;
345 Ok(result
346 .get("result")
347 .and_then(|v| v.as_bool())
348 .unwrap_or(false))
349 }
350
351 pub fn scard(&self, key: &str) -> Result<usize, String> {
353 let result = self.execute(serde_json::json!({"cmd": "SCARD", "key": key}))?;
354 result
355 .get("result")
356 .and_then(|v| v.as_u64())
357 .map(|n| n as usize)
358 .ok_or_else(|| "SCARD failed".into())
359 }
360
361 pub fn hset(&self, key: &str, field: &str, value: &str) -> Result<(), String> {
367 self.execute(
368 serde_json::json!({"cmd": "HSET", "key": key, "field": field, "value": value}),
369 )?;
370 Ok(())
371 }
372
373 pub fn hget(&self, key: &str, field: &str) -> Result<Option<String>, String> {
375 let result =
376 self.execute(serde_json::json!({"cmd": "HGET", "key": key, "field": field}))?;
377 Ok(result.get("result").and_then(|v| {
378 if v.is_null() {
379 None
380 } else {
381 v.as_str().map(|s| s.to_string())
382 }
383 }))
384 }
385
386 pub fn hdel(&self, key: &str, field: &str) -> Result<bool, String> {
388 let result =
389 self.execute(serde_json::json!({"cmd": "HDEL", "key": key, "field": field}))?;
390 Ok(result
391 .get("result")
392 .and_then(|v| v.as_bool())
393 .unwrap_or(false))
394 }
395
396 pub fn hgetall(&self, key: &str) -> Result<HashMap<String, String>, String> {
398 let result = self.execute(serde_json::json!({"cmd": "HGETALL", "key": key}))?;
399 let mut map = HashMap::new();
400 if let Some(obj) = result.get("result").and_then(|v| v.as_object()) {
401 for (k, v) in obj {
402 if let Some(s) = v.as_str() {
403 map.insert(k.clone(), s.to_string());
404 }
405 }
406 }
407 Ok(map)
408 }
409
410 pub fn hexists(&self, key: &str, field: &str) -> Result<bool, String> {
412 let result =
413 self.execute(serde_json::json!({"cmd": "HEXISTS", "key": key, "field": field}))?;
414 Ok(result
415 .get("result")
416 .and_then(|v| v.as_bool())
417 .unwrap_or(false))
418 }
419
420 pub fn hlen(&self, key: &str) -> Result<usize, String> {
422 let result = self.execute(serde_json::json!({"cmd": "HLEN", "key": key}))?;
423 result
424 .get("result")
425 .and_then(|v| v.as_u64())
426 .map(|n| n as usize)
427 .ok_or_else(|| "HLEN failed".into())
428 }
429
430 pub fn hkeys(&self, key: &str) -> Result<Vec<String>, String> {
432 let result = self.execute(serde_json::json!({"cmd": "HKEYS", "key": key}))?;
433 Ok(result
434 .get("result")
435 .and_then(|v| v.as_array())
436 .map(|arr| {
437 arr.iter()
438 .filter_map(|v| v.as_str().map(|s| s.to_string()))
439 .collect()
440 })
441 .unwrap_or_default())
442 }
443
444 pub fn hincrby(&self, key: &str, field: &str, amount: i64) -> Result<i64, String> {
446 let result = self.execute(
447 serde_json::json!({"cmd": "HINCRBY", "key": key, "field": field, "amount": amount}),
448 )?;
449 result
450 .get("result")
451 .and_then(|v| v.as_i64())
452 .ok_or_else(|| {
453 result
454 .get("error")
455 .and_then(|v| v.as_str())
456 .unwrap_or("HINCRBY failed")
457 .to_string()
458 })
459 }
460
461 pub fn zadd(&self, key: &str, score: f64, member: &str) -> Result<(), String> {
467 self.execute(
468 serde_json::json!({"cmd": "ZADD", "key": key, "score": score, "member": member}),
469 )?;
470 Ok(())
471 }
472
473 pub fn zrem(&self, key: &str, member: &str) -> Result<bool, String> {
475 let result =
476 self.execute(serde_json::json!({"cmd": "ZREM", "key": key, "member": member}))?;
477 Ok(result
478 .get("result")
479 .and_then(|v| v.as_bool())
480 .unwrap_or(false))
481 }
482
483 pub fn zscore(&self, key: &str, member: &str) -> Result<Option<f64>, String> {
485 let result =
486 self.execute(serde_json::json!({"cmd": "ZSCORE", "key": key, "member": member}))?;
487 Ok(result
488 .get("result")
489 .and_then(|v| if v.is_null() { None } else { v.as_f64() }))
490 }
491
492 pub fn zrank(&self, key: &str, member: &str) -> Result<Option<usize>, String> {
494 let result =
495 self.execute(serde_json::json!({"cmd": "ZRANK", "key": key, "member": member}))?;
496 Ok(result.get("result").and_then(|v| {
497 if v.is_null() {
498 None
499 } else {
500 v.as_u64().map(|n| n as usize)
501 }
502 }))
503 }
504
505 pub fn zrange(
507 &self,
508 key: &str,
509 start: usize,
510 stop: usize,
511 ) -> Result<Vec<(String, f64)>, String> {
512 let result = self.execute(
513 serde_json::json!({"cmd": "ZRANGE", "key": key, "start": start, "stop": stop}),
514 )?;
515 let mut entries = Vec::new();
516 if let Some(arr) = result.get("result").and_then(|v| v.as_array()) {
517 for item in arr {
518 if let Some(obj) = item.as_object() {
519 let member = obj
520 .get("member")
521 .and_then(|v| v.as_str())
522 .unwrap_or("")
523 .to_string();
524 let score = obj.get("score").and_then(|v| v.as_f64()).unwrap_or(0.0);
525 entries.push((member, score));
526 }
527 }
528 }
529 Ok(entries)
530 }
531
532 pub fn zcard(&self, key: &str) -> Result<usize, String> {
534 let result = self.execute(serde_json::json!({"cmd": "ZCARD", "key": key}))?;
535 result
536 .get("result")
537 .and_then(|v| v.as_u64())
538 .map(|n| n as usize)
539 .ok_or_else(|| "ZCARD failed".into())
540 }
541
542 pub fn keys(&self, pattern: &str) -> Result<Vec<String>, String> {
548 let result = self.execute(serde_json::json!({"cmd": "KEYS", "pattern": pattern}))?;
549 Ok(result
550 .get("result")
551 .and_then(|v| v.as_array())
552 .map(|arr| {
553 arr.iter()
554 .filter_map(|v| v.as_str().map(|s| s.to_string()))
555 .collect()
556 })
557 .unwrap_or_default())
558 }
559
560 pub fn ttl(&self, key: &str) -> Result<Option<u64>, String> {
562 let result = self.execute(serde_json::json!({"cmd": "TTL", "key": key}))?;
563 Ok(result
564 .get("result")
565 .and_then(|v| if v.is_null() { None } else { v.as_u64() }))
566 }
567
568 pub fn expire(&self, key: &str, seconds: u64) -> Result<bool, String> {
570 let result =
571 self.execute(serde_json::json!({"cmd": "EXPIRE", "key": key, "seconds": seconds}))?;
572 Ok(result
573 .get("result")
574 .and_then(|v| v.as_bool())
575 .unwrap_or(false))
576 }
577
578 pub fn persist(&self, key: &str) -> Result<bool, String> {
580 let result = self.execute(serde_json::json!({"cmd": "PERSIST", "key": key}))?;
581 Ok(result
582 .get("result")
583 .and_then(|v| v.as_bool())
584 .unwrap_or(false))
585 }
586
587 pub fn key_type(&self, key: &str) -> Result<String, String> {
589 let result = self.execute(serde_json::json!({"cmd": "TYPE", "key": key}))?;
590 Ok(result
591 .get("result")
592 .and_then(|v| v.as_str())
593 .unwrap_or("none")
594 .to_string())
595 }
596
597 pub fn dbsize(&self) -> Result<usize, String> {
599 let result = self.execute(serde_json::json!({"cmd": "DBSIZE"}))?;
600 result
601 .get("result")
602 .and_then(|v| v.as_u64())
603 .map(|n| n as usize)
604 .ok_or_else(|| "DBSIZE failed".into())
605 }
606
607 pub fn flushall(&self) -> Result<(), String> {
609 self.execute(serde_json::json!({"cmd": "FLUSHALL"}))?;
610 Ok(())
611 }
612
613 pub fn info(&self) -> Result<serde_json::Value, String> {
615 let result = self.execute(serde_json::json!({"cmd": "INFO"}))?;
616 Ok(result
617 .get("result")
618 .cloned()
619 .unwrap_or(serde_json::json!({})))
620 }
621
622 pub fn publish(&self, channel: &str, message: &str) -> Result<usize, String> {
629 let result = self.post(
630 "/pubsub/publish",
631 &serde_json::json!({"channel": channel, "message": message}),
632 )?;
633 Ok(result
634 .get("subscribers")
635 .and_then(|v| v.as_u64())
636 .unwrap_or(0) as usize)
637 }
638
639 pub fn channels(&self) -> Result<Vec<(String, usize)>, String> {
641 let result = self.get("/pubsub/channels")?;
642 let mut channels = Vec::new();
643 if let Some(arr) = result.get("result").and_then(|v| v.as_array()) {
644 for item in arr {
645 let ch = item
646 .get("channel")
647 .and_then(|v| v.as_str())
648 .unwrap_or("")
649 .to_string();
650 let count = item
651 .get("subscribers")
652 .and_then(|v| v.as_u64())
653 .unwrap_or(0) as usize;
654 channels.push((ch, count));
655 }
656 }
657 Ok(channels)
658 }
659
660 pub fn history(&self, channel: &str, limit: usize) -> Result<Vec<serde_json::Value>, String> {
662 let path = format!("/pubsub/history/{channel}?limit={limit}");
663 let result = self.get(&path)?;
664 Ok(result
665 .get("result")
666 .and_then(|v| v.as_array())
667 .cloned()
668 .unwrap_or_default())
669 }
670
671 pub fn health(&self) -> Result<serde_json::Value, String> {
673 self.get("/health")
674 }
675}
676
677#[cfg(test)]
682mod tests {
683 use super::*;
684
685 #[test]
686 fn parse_base_url() {
687 let c = RemoteCacheClient::new("http://localhost:6380");
688 assert_eq!(c.host_port, "localhost:6380");
689 }
690
691 #[test]
692 fn parse_base_url_trailing_slash() {
693 let c = RemoteCacheClient::new("http://localhost:6380/");
694 assert_eq!(c.host_port, "localhost:6380");
695 }
696
697 #[test]
698 fn parse_base_url_no_scheme() {
699 let c = RemoteCacheClient::new("cache.internal:6380");
700 assert_eq!(c.host_port, "cache.internal:6380");
701 }
702
703 #[test]
704 fn parse_base_url_with_path() {
705 let c = RemoteCacheClient::new("http://localhost:6380/some/path");
706 assert_eq!(c.host_port, "localhost:6380");
707 }
708}