reddb-io-server 1.1.1

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
//! Minimal HTTP server for RedDB management and remote access.

pub(crate) use crate::application::json_input::{
    json_bool_field, json_f32_field, json_string_field, json_usize_field,
};
pub(crate) use crate::application::{
    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
    SearchTextInput, TreeUseCases,
};
use std::collections::{BTreeMap, HashMap};
use std::io::{self, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use std::sync::Arc;

use crate::api::{RedDBError, RedDBOptions, RedDBResult};
use crate::auth::store::AuthStore;
use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
use crate::health::{HealthProvider, HealthReport, HealthState};
use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
use crate::runtime::{
    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
};
use crate::storage::schema::Value;
use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};

fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
    crate::presentation::admin_json::analytics_job_json(job)
}

fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
    crate::presentation::admin_json::graph_projection_json(projection)
}

pub mod handlers_admin;
mod handlers_ai;
mod handlers_auth;
mod handlers_backup;
mod handlers_ec;
mod handlers_entity;
mod handlers_geo;
mod handlers_graph;
mod handlers_keyed;
mod handlers_log;
mod handlers_ops;
mod handlers_query;
mod handlers_replication;
mod handlers_vcs;
mod handlers_vector;
pub mod header_escape_guard;
pub mod ingest_pipeline;
mod patch_support;
mod request_body;
mod request_context;
mod routing;
mod serverless_support;
pub mod tls;
mod transport;

use self::handlers_ai::*;
use self::handlers_entity::*;
use self::handlers_graph::*;
use self::handlers_keyed::*;
use self::handlers_ops::*;
use self::handlers_query::*;
use self::patch_support::*;
use self::request_body::*;
use self::routing::*;
use self::serverless_support::*;
use self::transport::*;

/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
/// can serve either every public surface (`Public`, default) or a
/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
/// the top of `route()` consults this so a port bound only to
/// loopback for admin work won't accidentally hand out DML.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerSurface {
    /// Everything routed normally (default — matches v0 behaviour).
    Public,
    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
    /// which default to `127.0.0.1`.
    AdminOnly,
    /// Only `/metrics` and `/health/*`. Intended for
    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
    /// exposed to non-admin networks.
    MetricsOnly,
}

#[derive(Debug, Clone)]
pub struct ServerOptions {
    pub bind_addr: String,
    pub max_body_bytes: usize,
    pub read_timeout_ms: u64,
    pub write_timeout_ms: u64,
    pub max_scan_limit: usize,
    /// Which subset of paths this listener serves. Defaults to
    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
    /// admin / scrape ports (PLAN.md Phase 6.2).
    pub surface: ServerSurface,
}

impl Default for ServerOptions {
    fn default() -> Self {
        Self {
            bind_addr: "127.0.0.1:5055".to_string(),
            max_body_bytes: 1024 * 1024,
            read_timeout_ms: 5_000,
            write_timeout_ms: 5_000,
            max_scan_limit: 1_000,
            surface: ServerSurface::Public,
        }
    }
}

/// Replication state exposed to the HTTP server.
pub struct ServerReplicationState {
    pub config: crate::replication::ReplicationConfig,
    pub primary: Option<crate::replication::primary::PrimaryReplication>,
}

#[derive(Clone)]
pub struct RedDBServer {
    runtime: RedDBRuntime,
    options: ServerOptions,
    auth_store: Option<Arc<AuthStore>>,
    replication: Option<Arc<ServerReplicationState>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ServerlessWarmupScope {
    Indexes,
    GraphProjections,
    AnalyticsJobs,
    NativeArtifacts,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DeploymentProfile {
    Embedded,
    Server,
    Serverless,
}

#[derive(Debug, Clone)]
struct ParsedQueryRequest {
    query: String,
    entity_types: Option<Vec<String>>,
    capabilities: Option<Vec<String>>,
    /// Optional positional `$N` bind parameters (#358). When `Some`, the
    /// query handler runs the user_params binder before executing.
    /// Absence preserves the legacy `query`-only behavior.
    params: Option<Vec<Value>>,
}

#[derive(Debug, Clone, Copy)]
enum PatchOperationType {
    Set,
    Replace,
    Unset,
}

#[derive(Debug, Clone)]
struct PatchOperation {
    op: PatchOperationType,
    path: Vec<String>,
    value: Option<JsonValue>,
}

impl RedDBServer {
    pub fn new(runtime: RedDBRuntime) -> Self {
        Self::with_options(runtime, ServerOptions::default())
    }

    pub fn from_database_options(
        db_options: RedDBOptions,
        server_options: ServerOptions,
    ) -> RedDBResult<Self> {
        let runtime = RedDBRuntime::with_options(db_options)?;
        Ok(Self::with_options(runtime, server_options))
    }

    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
        Self {
            runtime,
            options,
            auth_store: None,
            replication: None,
        }
    }

    /// Attach an `AuthStore` for HTTP-layer authentication.
    /// Also injects the store into the runtime so that `Value::Secret`
    /// auto-encrypt/decrypt can reach the vault AES key.
    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
        self.runtime.set_auth_store(Arc::clone(&auth_store));
        self.auth_store = Some(auth_store);
        self
    }

    /// Attach replication state for status and snapshot endpoints.
    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
        self.replication = Some(state);
        self
    }

    pub fn runtime(&self) -> &RedDBRuntime {
        &self.runtime
    }

    pub fn options(&self) -> &ServerOptions {
        &self.options
    }

    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
        QueryUseCases::new(&self.runtime)
    }

    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
        AdminUseCases::new(&self.runtime)
    }

    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
        EntityUseCases::new(&self.runtime)
    }

    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
        CatalogUseCases::new(&self.runtime)
    }

    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
        GraphUseCases::new(&self.runtime)
    }

    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
        NativeUseCases::new(&self.runtime)
    }

    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
        TreeUseCases::new(&self.runtime)
    }

    pub fn serve(&self) -> io::Result<()> {
        let listener = TcpListener::bind(&self.options.bind_addr)?;
        self.serve_on(listener)
    }

    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    // Spawn a thread per connection for concurrent request handling
                    let server = self.clone();
                    thread::spawn(move || {
                        let _ = server.handle_connection(stream);
                    });
                }
                Err(err) => return Err(err),
            }
        }
        Ok(())
    }

    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve())
    }

    pub fn serve_in_background_on(
        &self,
        listener: TcpListener,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_on(listener))
    }

    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
    /// `tls_config` is shared across all connections (rustls
    /// `ServerConfig` is `Send + Sync`).
    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
        let listener = TcpListener::bind(&self.options.bind_addr)?;
        self.serve_tls_on(listener, tls_config)
    }

    pub fn serve_tls_on(
        &self,
        listener: TcpListener,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> io::Result<()> {
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let server = self.clone();
                    let cfg = tls_config.clone();
                    thread::spawn(move || {
                        let _ = server.handle_tls_connection(stream, cfg);
                    });
                }
                Err(err) => return Err(err),
            }
        }
        Ok(())
    }

    pub fn serve_tls_in_background(
        &self,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_tls(tls_config))
    }

    pub fn serve_tls_in_background_on(
        &self,
        listener: TcpListener,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> thread::JoinHandle<io::Result<()>> {
        let server = self.clone();
        thread::spawn(move || server.serve_tls_on(listener, tls_config))
    }

    fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;

        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
        if self.try_route_streaming(&request, &mut stream)? {
            return Ok(());
        }
        let response = self.route(request);
        stream.write_all(&response.to_http_bytes())?;
        stream.flush()?;
        Ok(())
    }

    fn handle_tls_connection(
        &self,
        tcp: TcpStream,
        tls_config: std::sync::Arc<rustls::ServerConfig>,
    ) -> io::Result<()> {
        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
            Ok(s) => s,
            Err(err) => {
                tracing::warn!(
                    target: "reddb::http_tls",
                    err = %err,
                    "TLS handshake failed"
                );
                return Err(err);
            }
        };
        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
            Ok(req) => req,
            Err(err) => {
                tracing::warn!(
                    target: "reddb::http_tls",
                    err = %err,
                    "TLS request parse failed"
                );
                return Err(err);
            }
        };
        if self.try_route_streaming(&request, &mut tls_stream)? {
            return Ok(());
        }
        let response = self.route(request);
        tls_stream.write_all(&response.to_http_bytes())?;
        tls_stream.flush()?;
        Ok(())
    }
}