Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48    crate::presentation::admin_json::analytics_job_json(job)
49}
50
51fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
52    crate::presentation::admin_json::graph_projection_json(projection)
53}
54
55pub mod handlers_admin;
56mod handlers_ai;
57mod handlers_auth;
58mod handlers_backup;
59mod handlers_ec;
60mod handlers_entity;
61mod handlers_geo;
62mod handlers_graph;
63mod handlers_keyed;
64mod handlers_log;
65mod handlers_ops;
66mod handlers_query;
67mod handlers_replication;
68mod handlers_vcs;
69mod handlers_vector;
70pub mod header_escape_guard;
71pub mod ingest_pipeline;
72mod patch_support;
73mod request_body;
74mod request_context;
75mod routing;
76mod serverless_support;
77pub mod tls;
78mod transport;
79
80use self::handlers_ai::*;
81use self::handlers_entity::*;
82use self::handlers_graph::*;
83use self::handlers_keyed::*;
84use self::handlers_ops::*;
85use self::handlers_query::*;
86use self::patch_support::*;
87use self::request_body::*;
88use self::routing::*;
89use self::serverless_support::*;
90use self::transport::*;
91
92/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
93/// can serve either every public surface (`Public`, default) or a
94/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
95/// the top of `route()` consults this so a port bound only to
96/// loopback for admin work won't accidentally hand out DML.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ServerSurface {
99    /// Everything routed normally (default — matches v0 behaviour).
100    Public,
101    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
102    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
103    /// which default to `127.0.0.1`.
104    AdminOnly,
105    /// Only `/metrics` and `/health/*`. Intended for
106    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
107    /// exposed to non-admin networks.
108    MetricsOnly,
109}
110
111#[derive(Debug, Clone)]
112pub struct ServerOptions {
113    pub bind_addr: String,
114    pub max_body_bytes: usize,
115    pub read_timeout_ms: u64,
116    pub write_timeout_ms: u64,
117    pub max_scan_limit: usize,
118    /// Which subset of paths this listener serves. Defaults to
119    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
120    /// admin / scrape ports (PLAN.md Phase 6.2).
121    pub surface: ServerSurface,
122}
123
124impl Default for ServerOptions {
125    fn default() -> Self {
126        Self {
127            bind_addr: "127.0.0.1:5055".to_string(),
128            max_body_bytes: 1024 * 1024,
129            read_timeout_ms: 5_000,
130            write_timeout_ms: 5_000,
131            max_scan_limit: 1_000,
132            surface: ServerSurface::Public,
133        }
134    }
135}
136
137/// Replication state exposed to the HTTP server.
138pub struct ServerReplicationState {
139    pub config: crate::replication::ReplicationConfig,
140    pub primary: Option<crate::replication::primary::PrimaryReplication>,
141}
142
143#[derive(Clone)]
144pub struct RedDBServer {
145    runtime: RedDBRuntime,
146    options: ServerOptions,
147    auth_store: Option<Arc<AuthStore>>,
148    replication: Option<Arc<ServerReplicationState>>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum ServerlessWarmupScope {
153    Indexes,
154    GraphProjections,
155    AnalyticsJobs,
156    NativeArtifacts,
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160enum DeploymentProfile {
161    Embedded,
162    Server,
163    Serverless,
164}
165
166#[derive(Debug, Clone)]
167struct ParsedQueryRequest {
168    query: String,
169    entity_types: Option<Vec<String>>,
170    capabilities: Option<Vec<String>>,
171}
172
173#[derive(Debug, Clone, Copy)]
174enum PatchOperationType {
175    Set,
176    Replace,
177    Unset,
178}
179
180#[derive(Debug, Clone)]
181struct PatchOperation {
182    op: PatchOperationType,
183    path: Vec<String>,
184    value: Option<JsonValue>,
185}
186
187impl RedDBServer {
188    pub fn new(runtime: RedDBRuntime) -> Self {
189        Self::with_options(runtime, ServerOptions::default())
190    }
191
192    pub fn from_database_options(
193        db_options: RedDBOptions,
194        server_options: ServerOptions,
195    ) -> RedDBResult<Self> {
196        let runtime = RedDBRuntime::with_options(db_options)?;
197        Ok(Self::with_options(runtime, server_options))
198    }
199
200    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
201        Self {
202            runtime,
203            options,
204            auth_store: None,
205            replication: None,
206        }
207    }
208
209    /// Attach an `AuthStore` for HTTP-layer authentication.
210    /// Also injects the store into the runtime so that `Value::Secret`
211    /// auto-encrypt/decrypt can reach the vault AES key.
212    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
213        self.runtime.set_auth_store(Arc::clone(&auth_store));
214        self.auth_store = Some(auth_store);
215        self
216    }
217
218    /// Attach replication state for status and snapshot endpoints.
219    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
220        self.replication = Some(state);
221        self
222    }
223
224    pub fn runtime(&self) -> &RedDBRuntime {
225        &self.runtime
226    }
227
228    pub fn options(&self) -> &ServerOptions {
229        &self.options
230    }
231
232    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
233        QueryUseCases::new(&self.runtime)
234    }
235
236    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
237        AdminUseCases::new(&self.runtime)
238    }
239
240    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
241        EntityUseCases::new(&self.runtime)
242    }
243
244    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
245        CatalogUseCases::new(&self.runtime)
246    }
247
248    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
249        GraphUseCases::new(&self.runtime)
250    }
251
252    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
253        NativeUseCases::new(&self.runtime)
254    }
255
256    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
257        TreeUseCases::new(&self.runtime)
258    }
259
260    pub fn serve(&self) -> io::Result<()> {
261        let listener = TcpListener::bind(&self.options.bind_addr)?;
262        self.serve_on(listener)
263    }
264
265    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
266        for stream in listener.incoming() {
267            match stream {
268                Ok(stream) => {
269                    // Spawn a thread per connection for concurrent request handling
270                    let server = self.clone();
271                    thread::spawn(move || {
272                        let _ = server.handle_connection(stream);
273                    });
274                }
275                Err(err) => return Err(err),
276            }
277        }
278        Ok(())
279    }
280
281    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
282        let server = self.clone();
283        thread::spawn(move || server.serve())
284    }
285
286    pub fn serve_in_background_on(
287        &self,
288        listener: TcpListener,
289    ) -> thread::JoinHandle<io::Result<()>> {
290        let server = self.clone();
291        thread::spawn(move || server.serve_on(listener))
292    }
293
294    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
295    /// `tls_config` is shared across all connections (rustls
296    /// `ServerConfig` is `Send + Sync`).
297    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
298        let listener = TcpListener::bind(&self.options.bind_addr)?;
299        self.serve_tls_on(listener, tls_config)
300    }
301
302    pub fn serve_tls_on(
303        &self,
304        listener: TcpListener,
305        tls_config: std::sync::Arc<rustls::ServerConfig>,
306    ) -> io::Result<()> {
307        for stream in listener.incoming() {
308            match stream {
309                Ok(stream) => {
310                    let server = self.clone();
311                    let cfg = tls_config.clone();
312                    thread::spawn(move || {
313                        let _ = server.handle_tls_connection(stream, cfg);
314                    });
315                }
316                Err(err) => return Err(err),
317            }
318        }
319        Ok(())
320    }
321
322    pub fn serve_tls_in_background(
323        &self,
324        tls_config: std::sync::Arc<rustls::ServerConfig>,
325    ) -> thread::JoinHandle<io::Result<()>> {
326        let server = self.clone();
327        thread::spawn(move || server.serve_tls(tls_config))
328    }
329
330    pub fn serve_tls_in_background_on(
331        &self,
332        listener: TcpListener,
333        tls_config: std::sync::Arc<rustls::ServerConfig>,
334    ) -> thread::JoinHandle<io::Result<()>> {
335        let server = self.clone();
336        thread::spawn(move || server.serve_tls_on(listener, tls_config))
337    }
338
339    fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
340        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
341        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
342
343        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
344        let response = self.route(request);
345        stream.write_all(&response.to_http_bytes())?;
346        stream.flush()?;
347        Ok(())
348    }
349
350    fn handle_tls_connection(
351        &self,
352        tcp: TcpStream,
353        tls_config: std::sync::Arc<rustls::ServerConfig>,
354    ) -> io::Result<()> {
355        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
356        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
357        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
358            Ok(s) => s,
359            Err(err) => {
360                tracing::warn!(
361                    target: "reddb::http_tls",
362                    err = %err,
363                    "TLS handshake failed"
364                );
365                return Err(err);
366            }
367        };
368        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
369            Ok(req) => req,
370            Err(err) => {
371                tracing::warn!(
372                    target: "reddb::http_tls",
373                    err = %err,
374                    "TLS request parse failed"
375                );
376                return Err(err);
377            }
378        };
379        let response = self.route(request);
380        tls_stream.write_all(&response.to_http_bytes())?;
381        tls_stream.flush()?;
382        Ok(())
383    }
384}