iridium-db 0.3.0

A high-performance vector-graph hybrid storage and indexing engine
use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;

use crate::features::client::{embedded_driver, DriverConfig};
use crate::features::ops;

use super::http::{admin_authorized, parse_query_params, split_target, unix_now, write_response};
use super::{ServiceConfig, ServiceError};

pub fn serve(config: &ServiceConfig) -> Result<(), ServiceError> {
    if config.listen_address.trim().is_empty() {
        return Err(ServiceError::InvalidConfig(
            "listen_address must not be empty".to_string(),
        ));
    }
    if config.telemetry_endpoint.trim().is_empty() {
        return Err(ServiceError::InvalidConfig(
            "telemetry_endpoint must not be empty".to_string(),
        ));
    }
    if !config.data_dir.is_absolute() && config.data_dir.as_os_str().is_empty() {
        return Err(ServiceError::InvalidConfig(
            "data_dir must not be empty".to_string(),
        ));
    }

    let listener = TcpListener::bind(&config.listen_address)?;
    listener.set_nonblocking(false)?;
    let mut served = 0usize;
    while config.max_requests == 0 || served < config.max_requests {
        let (mut stream, _) = listener.accept()?;
        stream.set_read_timeout(Some(Duration::from_secs(2)))?;
        stream.set_write_timeout(Some(Duration::from_secs(2)))?;
        let should_stop = handle_connection(&mut stream, config)?;
        served += 1;
        if should_stop {
            break;
        }
    }
    Ok(())
}

fn handle_connection(stream: &mut TcpStream, config: &ServiceConfig) -> Result<bool, ServiceError> {
    let mut buffer = [0_u8; 8192];
    let bytes_read = stream.read(&mut buffer)?;
    if bytes_read == 0 {
        return Ok(false);
    }
    let request = String::from_utf8_lossy(&buffer[..bytes_read]);
    let mut lines = request.lines();
    let request_line = lines.next().unwrap_or("");
    let mut parts = request_line.split_whitespace();
    let method = parts.next().unwrap_or("");
    let target = parts.next().unwrap_or("/");

    let headers = lines
        .take_while(|line| !line.is_empty())
        .filter_map(|line| line.split_once(':'))
        .map(|(name, value)| (name.trim().to_ascii_lowercase(), value.trim().to_string()))
        .collect::<std::collections::HashMap<String, String>>();

    let (path, query) = split_target(target);
    let needs_admin = path.starts_with("/admin/");
    if needs_admin && !admin_authorized(config, &headers) {
        write_response(stream, "401 Unauthorized", "text/plain", "unauthorized\n")?;
        return Ok(false);
    }

    match (method, path) {
        ("GET", "/livez") => {
            write_response(stream, "200 OK", "text/plain", "live\n")?;
            Ok(false)
        }
        ("GET", "/readyz") => {
            let checked_at = unix_now();
            let report = ops::service_report(Some(&config.listen_address), checked_at);
            write_response(
                stream,
                "200 OK",
                "application/json",
                &report.to_json_pretty(),
            )?;
            Ok(false)
        }
        ("GET", "/metrics") => {
            write_response(
                stream,
                "200 OK",
                "text/plain; version=0.0.4",
                &ops::metrics_prometheus_text(),
            )?;
            Ok(false)
        }
        ("GET", "/v1/query") => {
            let params = parse_query_params(query);
            let Some(query_text) = params.get("q") else {
                write_response(stream, "400 Bad Request", "text/plain", "missing q\n")?;
                return Ok(false);
            };
            let body = execute_query(&config.data_dir, query_text)?;
            write_response(stream, "200 OK", "application/json", &body)?;
            Ok(false)
        }
        ("GET", "/admin/status") => {
            let checked_at = unix_now();
            let report = ops::service_report(Some(&config.listen_address), checked_at);
            write_response(
                stream,
                "200 OK",
                "application/json",
                &report.to_json_pretty(),
            )?;
            Ok(false)
        }
        ("GET", "/admin/lifecycle") => {
            let params = parse_query_params(query);
            let action = params.get("action").map(String::as_str).unwrap_or("status");
            let body = format!(
                concat!(
                    "{{\n",
                    "  \"action\": \"{}\",\n",
                    "  \"supported\": [\"status\", \"stop\"],\n",
                    "  \"admin_token_required\": {}\n",
                    "}}\n"
                ),
                action,
                config.admin_token.is_some()
            );
            write_response(stream, "200 OK", "application/json", &body)?;
            Ok(action == "stop")
        }
        _ => {
            write_response(stream, "404 Not Found", "text/plain", "not found\n")?;
            Ok(false)
        }
    }
}

fn execute_query(data_dir: &Path, query_text: &str) -> Result<String, ServiceError> {
    let driver = embedded_driver(DriverConfig::embedded_data_dir(data_dir.to_path_buf()))?;
    let rows = driver.query(query_text)?;
    Ok(rows.to_json_pretty())
}