use std::sync::Arc;
use pylon_plugin::builtin::cache::CachePlugin;
use tiny_http::{Header, Method, Response, Server};
use crate::cache_handlers::{
handle_cache_command, handle_cache_delete, handle_cache_get, handle_pubsub_channels,
handle_pubsub_history, handle_pubsub_publish,
};
use crate::pubsub::PubSubBroker;
pub fn start_cache_server(port: u16, max_keys: usize, max_history: usize) -> Result<(), String> {
start_cache_server_with_options(port, max_keys, max_history, None, false)
}
pub fn start_cache_server_with_options(
port: u16,
max_keys: usize,
max_history: usize,
resp_port: Option<u16>,
resp_only: bool,
) -> Result<(), String> {
let cache = Arc::new(CachePlugin::new(max_keys));
let pubsub = Arc::new(PubSubBroker::new(max_history));
if let Some(rp) = resp_port {
let cache_for_resp = Arc::clone(&cache);
std::thread::spawn(move || {
crate::resp_server::start_resp_server(cache_for_resp, rp);
});
}
if resp_only {
let rp = resp_port.unwrap_or(6379);
tracing::warn!("[cache] RESP-only mode -- no HTTP server started");
if resp_port.is_none() {
crate::resp_server::start_resp_server(cache, rp);
} else {
loop {
std::thread::park();
}
}
return Ok(());
}
let addr = format!("0.0.0.0:{port}");
let server = Server::http(&addr).map_err(|e| format!("Failed to start cache server: {e}"))?;
tracing::warn!("pylon cache server listening on http://localhost:{port}");
tracing::warn!(" Cache: POST http://localhost:{port}/cache");
tracing::warn!(" PubSub: POST http://localhost:{port}/pubsub/publish");
tracing::warn!(" Health: GET http://localhost:{port}/health");
for mut request in server.incoming_requests() {
let cache = Arc::clone(&cache);
let pubsub = Arc::clone(&pubsub);
let mut body = String::new();
let _ = std::io::Read::read_to_string(request.as_reader(), &mut body);
let method = request.method().clone();
let url = request.url().to_string();
let (status, response_body) = route_request(&cache, &pubsub, &method, &url, &body);
let response = Response::from_string(&response_body)
.with_status_code(status)
.with_header(Header::from_bytes("Content-Type", "application/json").unwrap())
.with_header(Header::from_bytes("Access-Control-Allow-Origin", "*").unwrap());
let _ = request.respond(response);
}
Ok(())
}
fn route_request(
cache: &CachePlugin,
pubsub: &PubSubBroker,
method: &Method,
url: &str,
body: &str,
) -> (u16, String) {
if method.as_str() == "OPTIONS" {
return (204, String::new());
}
if url == "/health" && *method == Method::Get {
let info = cache.info();
return (
200,
serde_json::json!({
"status": "ok",
"mode": "standalone",
"keys": cache.dbsize(),
"stats": info,
})
.to_string(),
);
}
if url == "/cache" && *method == Method::Post {
return handle_cache_command(cache, body);
}
if let Some(key) = url.strip_prefix("/cache/") {
let key = key.split('?').next().unwrap_or(key);
if !key.is_empty() {
if *method == Method::Get {
return handle_cache_get(cache, key);
}
if *method == Method::Delete {
return handle_cache_delete(cache, key);
}
}
}
if url == "/pubsub/publish" && *method == Method::Post {
return handle_pubsub_publish(pubsub, body);
}
if url == "/pubsub/channels" && *method == Method::Get {
return handle_pubsub_channels(pubsub);
}
if let Some(channel) = url.strip_prefix("/pubsub/history/") {
let channel = channel.split('?').next().unwrap_or(channel);
if *method == Method::Get && !channel.is_empty() {
return handle_pubsub_history(pubsub, channel, url);
}
}
(
404,
serde_json::json!({
"error": {
"code": "NOT_FOUND",
"message": "Not found"
}
})
.to_string(),
)
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() -> (CachePlugin, PubSubBroker) {
(CachePlugin::new(1000), PubSubBroker::new(100))
}
#[test]
fn health_check() {
let (cache, pubsub) = setup();
let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/health", "");
assert_eq!(status, 200);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["status"], "ok");
assert_eq!(parsed["mode"], "standalone");
}
#[test]
fn cache_command_via_route() {
let (cache, pubsub) = setup();
let (status, _) = route_request(
&cache,
&pubsub,
&Method::Post,
"/cache",
r#"{"cmd": "SET", "key": "x", "value": "1"}"#,
);
assert_eq!(status, 200);
let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/cache/x", "");
assert_eq!(status, 200);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["result"], "1");
}
#[test]
fn delete_via_route() {
let (cache, pubsub) = setup();
cache.set("del_me", "val", None);
let (status, _) = route_request(&cache, &pubsub, &Method::Delete, "/cache/del_me", "");
assert_eq!(status, 200);
assert!(cache.get("del_me").is_none());
}
#[test]
fn pubsub_publish_via_route() {
let (cache, pubsub) = setup();
let (status, body) = route_request(
&cache,
&pubsub,
&Method::Post,
"/pubsub/publish",
r#"{"channel": "test", "message": "hi"}"#,
);
assert_eq!(status, 200);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["ok"], true);
}
#[test]
fn pubsub_channels_via_route() {
let (cache, pubsub) = setup();
let (status, body) = route_request(&cache, &pubsub, &Method::Get, "/pubsub/channels", "");
assert_eq!(status, 200);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["ok"], true);
}
#[test]
fn pubsub_history_via_route() {
let (cache, pubsub) = setup();
pubsub.publish("events", "e1");
let (status, body) = route_request(
&cache,
&pubsub,
&Method::Get,
"/pubsub/history/events?limit=10",
"",
);
assert_eq!(status, 200);
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["result"].as_array().unwrap().len(), 1);
}
#[test]
fn not_found() {
let (cache, pubsub) = setup();
let (status, _) = route_request(&cache, &pubsub, &Method::Get, "/nonexistent", "");
assert_eq!(status, 404);
}
#[test]
fn cors_preflight() {
let (cache, pubsub) = setup();
let (status, body) = route_request(&cache, &pubsub, &Method::Options, "/cache", "");
assert_eq!(status, 204);
assert!(body.is_empty());
}
}