1use std::sync::Arc;
24
25use pylon_plugin::builtin::cache::CachePlugin;
26use tiny_http::{Header, Method, Response, Server};
27
28use crate::cache_handlers::{
29 handle_cache_command, handle_cache_delete, handle_cache_get, handle_pubsub_channels,
30 handle_pubsub_history, handle_pubsub_publish,
31};
32use crate::pubsub::PubSubBroker;
33
34pub fn start_cache_server(port: u16, max_keys: usize, max_history: usize) -> Result<(), String> {
40 start_cache_server_with_options(port, max_keys, max_history, None, false)
41}
42
43pub fn start_cache_server_with_options(
51 port: u16,
52 max_keys: usize,
53 max_history: usize,
54 resp_port: Option<u16>,
55 resp_only: bool,
56) -> Result<(), String> {
57 let cache = Arc::new(CachePlugin::new(max_keys));
58 let pubsub = Arc::new(PubSubBroker::new(max_history));
59
60 if let Some(rp) = resp_port {
62 let cache_for_resp = Arc::clone(&cache);
63 std::thread::spawn(move || {
64 crate::resp_server::start_resp_server(cache_for_resp, rp);
65 });
66 }
67
68 if resp_only {
70 let rp = resp_port.unwrap_or(6379);
71 tracing::warn!("[cache] RESP-only mode -- no HTTP server started");
72 if resp_port.is_none() {
76 crate::resp_server::start_resp_server(cache, rp);
77 } else {
78 loop {
81 std::thread::park();
82 }
83 }
84 return Ok(());
85 }
86
87 let addr = format!("0.0.0.0:{port}");
89 let server = Server::http(&addr).map_err(|e| format!("Failed to start cache server: {e}"))?;
90
91 tracing::warn!("pylon cache server listening on http://localhost:{port}");
92 tracing::warn!(" Cache: POST http://localhost:{port}/cache");
93 tracing::warn!(" PubSub: POST http://localhost:{port}/pubsub/publish");
94 tracing::warn!(" Health: GET http://localhost:{port}/health");
95
96 for mut request in server.incoming_requests() {
97 let cache = Arc::clone(&cache);
98 let pubsub = Arc::clone(&pubsub);
99
100 let mut body = String::new();
101 let _ = std::io::Read::read_to_string(request.as_reader(), &mut body);
102
103 let method = request.method().clone();
104 let url = request.url().to_string();
105
106 let (status, response_body) = route_request(&cache, &pubsub, &method, &url, &body);
107
108 let response = Response::from_string(&response_body)
109 .with_status_code(status)
110 .with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
111 .with_header(Header::from_bytes("Access-Control-Allow-Origin", "*").unwrap());
112
113 let _ = request.respond(response);
114 }
115
116 Ok(())
117}
118
119fn route_request(
121 cache: &CachePlugin,
122 pubsub: &PubSubBroker,
123 method: &Method,
124 url: &str,
125 body: &str,
126) -> (u16, String) {
127 if method.as_str() == "OPTIONS" {
129 return (204, String::new());
130 }
131
132 if url == "/health" && *method == Method::Get {
134 let info = cache.info();
135 return (
136 200,
137 serde_json::json!({
138 "status": "ok",
139 "mode": "standalone",
140 "keys": cache.dbsize(),
141 "stats": info,
142 })
143 .to_string(),
144 );
145 }
146
147 if url == "/cache" && *method == Method::Post {
149 return handle_cache_command(cache, body);
150 }
151
152 if let Some(key) = url.strip_prefix("/cache/") {
154 let key = key.split('?').next().unwrap_or(key);
155 if !key.is_empty() {
156 if *method == Method::Get {
157 return handle_cache_get(cache, key);
158 }
159 if *method == Method::Delete {
160 return handle_cache_delete(cache, key);
161 }
162 }
163 }
164
165 if url == "/pubsub/publish" && *method == Method::Post {
167 return handle_pubsub_publish(pubsub, body);
168 }
169
170 if url == "/pubsub/channels" && *method == Method::Get {
172 return handle_pubsub_channels(pubsub);
173 }
174
175 if let Some(channel) = url.strip_prefix("/pubsub/history/") {
177 let channel = channel.split('?').next().unwrap_or(channel);
178 if *method == Method::Get && !channel.is_empty() {
179 return handle_pubsub_history(pubsub, channel, url);
180 }
181 }
182
183 (
184 404,
185 serde_json::json!({
186 "error": {
187 "code": "NOT_FOUND",
188 "message": "Not found"
189 }
190 })
191 .to_string(),
192 )
193}
194
195#[cfg(test)]
200mod tests {
201 use super::*;
202
203 fn setup() -> (CachePlugin, PubSubBroker) {
204 (CachePlugin::new(1000), PubSubBroker::new(100))
205 }
206
207 #[test]
208 fn health_check() {
209 let (cache, pubsub) = setup();
210 let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/health", "");
211 assert_eq!(status, 200);
212 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
213 assert_eq!(parsed["status"], "ok");
214 assert_eq!(parsed["mode"], "standalone");
215 }
216
217 #[test]
218 fn cache_command_via_route() {
219 let (cache, pubsub) = setup();
220 let (status, _) = route_request(
221 &cache,
222 &pubsub,
223 &Method::Post,
224 "/cache",
225 r#"{"cmd": "SET", "key": "x", "value": "1"}"#,
226 );
227 assert_eq!(status, 200);
228
229 let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/cache/x", "");
230 assert_eq!(status, 200);
231 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
232 assert_eq!(parsed["result"], "1");
233 }
234
235 #[test]
236 fn delete_via_route() {
237 let (cache, pubsub) = setup();
238 cache.set("del_me", "val", None);
239 let (status, _) = route_request(&cache, &pubsub, &Method::Delete, "/cache/del_me", "");
240 assert_eq!(status, 200);
241 assert!(cache.get("del_me").is_none());
242 }
243
244 #[test]
245 fn pubsub_publish_via_route() {
246 let (cache, pubsub) = setup();
247 let (status, body) = route_request(
248 &cache,
249 &pubsub,
250 &Method::Post,
251 "/pubsub/publish",
252 r#"{"channel": "test", "message": "hi"}"#,
253 );
254 assert_eq!(status, 200);
255 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
256 assert_eq!(parsed["ok"], true);
257 }
258
259 #[test]
260 fn pubsub_channels_via_route() {
261 let (cache, pubsub) = setup();
262 let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/pubsub/channels", "");
263 assert_eq!(status, 200);
264 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
265 assert_eq!(parsed["ok"], true);
266 }
267
268 #[test]
269 fn pubsub_history_via_route() {
270 let (cache, pubsub) = setup();
271 pubsub.publish("events", "e1");
272 let (status, body) = route_request(
273 &cache,
274 &pubsub,
275 &Method::Get,
276 "/pubsub/history/events?limit=10",
277 "",
278 );
279 assert_eq!(status, 200);
280 let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
281 assert_eq!(parsed["result"].as_array().unwrap().len(), 1);
282 }
283
284 #[test]
285 fn not_found() {
286 let (cache, pubsub) = setup();
287 let (status, _) = route_request(&cache, &pubsub, &Method::Get, "/nonexistent", "");
288 assert_eq!(status, 404);
289 }
290
291 #[test]
292 fn cors_preflight() {
293 let (cache, pubsub) = setup();
294 let (status, body) = route_request(&cache, &pubsub, &Method::Options, "/cache", "");
295 assert_eq!(status, 204);
296 assert!(body.is_empty());
297 }
298}