Skip to main content

pylon_runtime/
cache_server.rs

1//! Standalone cache server.
2//!
3//! Runs a lightweight HTTP server that exposes only cache and pub/sub
4//! endpoints. This allows the cache to be deployed independently of the
5//! main pylon server for horizontal scaling.
6//!
7//! # Usage
8//!
9//! ```text
10//! pylon cache --port 6380 --max-keys 100000 --max-history 100
11//! ```
12//!
13//! # Endpoints
14//!
15//! - `POST /cache`              -- execute a cache command (same protocol as `/api/cache`)
16//! - `GET  /cache/:key`         -- shorthand GET
17//! - `DELETE /cache/:key`       -- shorthand DELETE
18//! - `POST /pubsub/publish`     -- publish a message
19//! - `GET  /pubsub/channels`    -- list channels
20//! - `GET  /pubsub/history/:ch` -- channel history
21//! - `GET  /health`             -- health check
22
23use std::sync::Arc;
24
25use pylon_plugin::builtin::cache::CachePlugin;
26use tiny_http::{Header, Method, Response, Server};
27
28use crate::cache_handlers::{
29    handle_cache_command, handle_cache_delete, handle_cache_get, handle_pubsub_channels,
30    handle_pubsub_history, handle_pubsub_publish,
31};
32use crate::pubsub::PubSubBroker;
33
34/// Start a standalone cache server on the given port.
35///
36/// This blocks the calling thread, serving requests in a synchronous loop.
37/// It runs independently of the main pylon server -- no auth, no entities,
38/// no sync. Just the cache and pub/sub.
39pub fn start_cache_server(port: u16, max_keys: usize, max_history: usize) -> Result<(), String> {
40    start_cache_server_with_options(port, max_keys, max_history, None, false)
41}
42
43/// Start a standalone cache server with optional RESP protocol support.
44///
45/// When `resp_port` is `Some(port)`, a RESP-compatible TCP server is also
46/// started on that port, allowing `redis-cli` and any Redis client library
47/// to connect directly.
48///
49/// When `resp_only` is `true`, only the RESP server is started (no HTTP).
50pub fn start_cache_server_with_options(
51    port: u16,
52    max_keys: usize,
53    max_history: usize,
54    resp_port: Option<u16>,
55    resp_only: bool,
56) -> Result<(), String> {
57    let cache = Arc::new(CachePlugin::new(max_keys));
58    let pubsub = Arc::new(PubSubBroker::new(max_history));
59
60    // Start the RESP server on a background thread if requested.
61    if let Some(rp) = resp_port {
62        let cache_for_resp = Arc::clone(&cache);
63        std::thread::spawn(move || {
64            crate::resp_server::start_resp_server(cache_for_resp, rp);
65        });
66    }
67
68    // If resp-only mode, block on the RESP server instead of HTTP.
69    if resp_only {
70        let rp = resp_port.unwrap_or(6379);
71        tracing::warn!("[cache] RESP-only mode -- no HTTP server started");
72        // The RESP server was already spawned above if resp_port was Some.
73        // If it was None (user said --resp-only without --resp-port), start
74        // it on the default port on this thread.
75        if resp_port.is_none() {
76            crate::resp_server::start_resp_server(cache, rp);
77        } else {
78            // Block forever so the process doesn't exit. The RESP server
79            // thread is doing the real work.
80            loop {
81                std::thread::park();
82            }
83        }
84        return Ok(());
85    }
86
87    // Start the HTTP server.
88    let addr = format!("0.0.0.0:{port}");
89    let server = Server::http(&addr).map_err(|e| format!("Failed to start cache server: {e}"))?;
90
91    tracing::warn!("pylon cache server listening on http://localhost:{port}");
92    tracing::warn!("  Cache:  POST http://localhost:{port}/cache");
93    tracing::warn!("  PubSub: POST http://localhost:{port}/pubsub/publish");
94    tracing::warn!("  Health: GET  http://localhost:{port}/health");
95
96    for mut request in server.incoming_requests() {
97        let cache = Arc::clone(&cache);
98        let pubsub = Arc::clone(&pubsub);
99
100        let mut body = String::new();
101        let _ = std::io::Read::read_to_string(request.as_reader(), &mut body);
102
103        let method = request.method().clone();
104        let url = request.url().to_string();
105
106        let (status, response_body) = route_request(&cache, &pubsub, &method, &url, &body);
107
108        let response = Response::from_string(&response_body)
109            .with_status_code(status)
110            .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
111            .with_header(Header::from_bytes("Access-Control-Allow-Origin", "*").unwrap());
112
113        let _ = request.respond(response);
114    }
115
116    Ok(())
117}
118
119/// Route a request to the appropriate handler.
120fn route_request(
121    cache: &CachePlugin,
122    pubsub: &PubSubBroker,
123    method: &Method,
124    url: &str,
125    body: &str,
126) -> (u16, String) {
127    // CORS preflight
128    if method.as_str() == "OPTIONS" {
129        return (204, String::new());
130    }
131
132    // Health check
133    if url == "/health" && *method == Method::Get {
134        let info = cache.info();
135        return (
136            200,
137            serde_json::json!({
138                "status": "ok",
139                "mode": "standalone",
140                "keys": cache.dbsize(),
141                "stats": info,
142            })
143            .to_string(),
144        );
145    }
146
147    // POST /cache -- execute a cache command
148    if url == "/cache" && *method == Method::Post {
149        return handle_cache_command(cache, body);
150    }
151
152    // GET or DELETE /cache/:key
153    if let Some(key) = url.strip_prefix("/cache/") {
154        let key = key.split('?').next().unwrap_or(key);
155        if !key.is_empty() {
156            if *method == Method::Get {
157                return handle_cache_get(cache, key);
158            }
159            if *method == Method::Delete {
160                return handle_cache_delete(cache, key);
161            }
162        }
163    }
164
165    // POST /pubsub/publish
166    if url == "/pubsub/publish" && *method == Method::Post {
167        return handle_pubsub_publish(pubsub, body);
168    }
169
170    // GET /pubsub/channels
171    if url == "/pubsub/channels" && *method == Method::Get {
172        return handle_pubsub_channels(pubsub);
173    }
174
175    // GET /pubsub/history/:channel
176    if let Some(channel) = url.strip_prefix("/pubsub/history/") {
177        let channel = channel.split('?').next().unwrap_or(channel);
178        if *method == Method::Get && !channel.is_empty() {
179            return handle_pubsub_history(pubsub, channel, url);
180        }
181    }
182
183    (
184        404,
185        serde_json::json!({
186            "error": {
187                "code": "NOT_FOUND",
188                "message": "Not found"
189            }
190        })
191        .to_string(),
192    )
193}
194
195// ---------------------------------------------------------------------------
196// Tests
197// ---------------------------------------------------------------------------
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    fn setup() -> (CachePlugin, PubSubBroker) {
204        (CachePlugin::new(1000), PubSubBroker::new(100))
205    }
206
207    #[test]
208    fn health_check() {
209        let (cache, pubsub) = setup();
210        let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/health", "");
211        assert_eq!(status, 200);
212        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
213        assert_eq!(parsed["status"], "ok");
214        assert_eq!(parsed["mode"], "standalone");
215    }
216
217    #[test]
218    fn cache_command_via_route() {
219        let (cache, pubsub) = setup();
220        let (status, _) = route_request(
221            &cache,
222            &pubsub,
223            &Method::Post,
224            "/cache",
225            r#"{"cmd": "SET", "key": "x", "value": "1"}"#,
226        );
227        assert_eq!(status, 200);
228
229        let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/cache/x", "");
230        assert_eq!(status, 200);
231        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
232        assert_eq!(parsed["result"], "1");
233    }
234
235    #[test]
236    fn delete_via_route() {
237        let (cache, pubsub) = setup();
238        cache.set("del_me", "val", None);
239        let (status, _) = route_request(&cache, &pubsub, &Method::Delete, "/cache/del_me", "");
240        assert_eq!(status, 200);
241        assert!(cache.get("del_me").is_none());
242    }
243
244    #[test]
245    fn pubsub_publish_via_route() {
246        let (cache, pubsub) = setup();
247        let (status, body) = route_request(
248            &cache,
249            &pubsub,
250            &Method::Post,
251            "/pubsub/publish",
252            r#"{"channel": "test", "message": "hi"}"#,
253        );
254        assert_eq!(status, 200);
255        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
256        assert_eq!(parsed["ok"], true);
257    }
258
259    #[test]
260    fn pubsub_channels_via_route() {
261        let (cache, pubsub) = setup();
262        let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/pubsub/channels", "");
263        assert_eq!(status, 200);
264        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
265        assert_eq!(parsed["ok"], true);
266    }
267
268    #[test]
269    fn pubsub_history_via_route() {
270        let (cache, pubsub) = setup();
271        pubsub.publish("events", "e1");
272        let (status, body) = route_request(
273            &cache,
274            &pubsub,
275            &Method::Get,
276            "/pubsub/history/events?limit=10",
277            "",
278        );
279        assert_eq!(status, 200);
280        let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
281        assert_eq!(parsed["result"].as_array().unwrap().len(), 1);
282    }
283
284    #[test]
285    fn not_found() {
286        let (cache, pubsub) = setup();
287        let (status, _) = route_request(&cache, &pubsub, &Method::Get, "/nonexistent", "");
288        assert_eq!(status, 404);
289    }
290
291    #[test]
292    fn cors_preflight() {
293        let (cache, pubsub) = setup();
294        let (status, body) = route_request(&cache, &pubsub, &Method::Options, "/cache", "");
295        assert_eq!(status, 204);
296        assert!(body.is_empty());
297    }
298}