use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
use crate::features::client::{embedded_driver, DriverConfig};
use crate::features::ops;
use super::http::{admin_authorized, parse_query_params, split_target, unix_now, write_response};
use super::{ServiceConfig, ServiceError};
pub fn serve(config: &ServiceConfig) -> Result<(), ServiceError> {
if config.listen_address.trim().is_empty() {
return Err(ServiceError::InvalidConfig(
"listen_address must not be empty".to_string(),
));
}
if config.telemetry_endpoint.trim().is_empty() {
return Err(ServiceError::InvalidConfig(
"telemetry_endpoint must not be empty".to_string(),
));
}
if !config.data_dir.is_absolute() && config.data_dir.as_os_str().is_empty() {
return Err(ServiceError::InvalidConfig(
"data_dir must not be empty".to_string(),
));
}
let listener = TcpListener::bind(&config.listen_address)?;
listener.set_nonblocking(false)?;
let mut served = 0usize;
while config.max_requests == 0 || served < config.max_requests {
let (mut stream, _) = listener.accept()?;
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
stream.set_write_timeout(Some(Duration::from_secs(2)))?;
let should_stop = handle_connection(&mut stream, config)?;
served += 1;
if should_stop {
break;
}
}
Ok(())
}
fn handle_connection(stream: &mut TcpStream, config: &ServiceConfig) -> Result<bool, ServiceError> {
let mut buffer = [0_u8; 8192];
let bytes_read = stream.read(&mut buffer)?;
if bytes_read == 0 {
return Ok(false);
}
let request = String::from_utf8_lossy(&buffer[..bytes_read]);
let mut lines = request.lines();
let request_line = lines.next().unwrap_or("");
let mut parts = request_line.split_whitespace();
let method = parts.next().unwrap_or("");
let target = parts.next().unwrap_or("/");
let headers = lines
.take_while(|line| !line.is_empty())
.filter_map(|line| line.split_once(':'))
.map(|(name, value)| (name.trim().to_ascii_lowercase(), value.trim().to_string()))
.collect::<std::collections::HashMap<String, String>>();
let (path, query) = split_target(target);
let needs_admin = path.starts_with("/admin/");
if needs_admin && !admin_authorized(config, &headers) {
write_response(stream, "401 Unauthorized", "text/plain", "unauthorized\n")?;
return Ok(false);
}
match (method, path) {
("GET", "/livez") => {
write_response(stream, "200 OK", "text/plain", "live\n")?;
Ok(false)
}
("GET", "/readyz") => {
let checked_at = unix_now();
let report = ops::service_report(Some(&config.listen_address), checked_at);
write_response(
stream,
"200 OK",
"application/json",
&report.to_json_pretty(),
)?;
Ok(false)
}
("GET", "/metrics") => {
write_response(
stream,
"200 OK",
"text/plain; version=0.0.4",
&ops::metrics_prometheus_text(),
)?;
Ok(false)
}
("GET", "/v1/query") => {
let params = parse_query_params(query);
let Some(query_text) = params.get("q") else {
write_response(stream, "400 Bad Request", "text/plain", "missing q\n")?;
return Ok(false);
};
let body = execute_query(&config.data_dir, query_text)?;
write_response(stream, "200 OK", "application/json", &body)?;
Ok(false)
}
("GET", "/admin/status") => {
let checked_at = unix_now();
let report = ops::service_report(Some(&config.listen_address), checked_at);
write_response(
stream,
"200 OK",
"application/json",
&report.to_json_pretty(),
)?;
Ok(false)
}
("GET", "/admin/lifecycle") => {
let params = parse_query_params(query);
let action = params.get("action").map(String::as_str).unwrap_or("status");
let body = format!(
concat!(
"{{\n",
" \"action\": \"{}\",\n",
" \"supported\": [\"status\", \"stop\"],\n",
" \"admin_token_required\": {}\n",
"}}\n"
),
action,
config.admin_token.is_some()
);
write_response(stream, "200 OK", "application/json", &body)?;
Ok(action == "stop")
}
_ => {
write_response(stream, "404 Not Found", "text/plain", "not found\n")?;
Ok(false)
}
}
}
fn execute_query(data_dir: &Path, query_text: &str) -> Result<String, ServiceError> {
let driver = embedded_driver(DriverConfig::embedded_data_dir(data_dir.to_path_buf()))?;
let rows = driver.query(query_text)?;
Ok(rows.to_json_pretty())
}