Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48    crate::presentation::admin_json::analytics_job_json(job)
49}
50
51fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
52    crate::presentation::admin_json::graph_projection_json(projection)
53}
54
55pub mod handlers_admin;
56mod handlers_ai;
57mod handlers_auth;
58mod handlers_backup;
59mod handlers_ec;
60mod handlers_entity;
61mod handlers_geo;
62mod handlers_graph;
63mod handlers_keyed;
64mod handlers_log;
65mod handlers_ops;
66mod handlers_query;
67mod handlers_replication;
68mod handlers_vcs;
69mod handlers_vector;
70pub mod header_escape_guard;
71pub mod ingest_pipeline;
72mod patch_support;
73mod request_body;
74mod request_context;
75mod routing;
76mod serverless_support;
77pub mod tls;
78mod transport;
79
80use self::handlers_ai::*;
81use self::handlers_entity::*;
82use self::handlers_graph::*;
83use self::handlers_keyed::*;
84use self::handlers_ops::*;
85use self::handlers_query::*;
86use self::patch_support::*;
87use self::request_body::*;
88use self::routing::*;
89use self::serverless_support::*;
90use self::transport::*;
91
92/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
93/// can serve either every public surface (`Public`, default) or a
94/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
95/// the top of `route()` consults this so a port bound only to
96/// loopback for admin work won't accidentally hand out DML.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ServerSurface {
99    /// Everything routed normally (default — matches v0 behaviour).
100    Public,
101    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
102    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
103    /// which default to `127.0.0.1`.
104    AdminOnly,
105    /// Only `/metrics` and `/health/*`. Intended for
106    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
107    /// exposed to non-admin networks.
108    MetricsOnly,
109}
110
111#[derive(Debug, Clone)]
112pub struct ServerOptions {
113    pub bind_addr: String,
114    pub max_body_bytes: usize,
115    pub read_timeout_ms: u64,
116    pub write_timeout_ms: u64,
117    pub max_scan_limit: usize,
118    /// Which subset of paths this listener serves. Defaults to
119    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
120    /// admin / scrape ports (PLAN.md Phase 6.2).
121    pub surface: ServerSurface,
122}
123
124impl Default for ServerOptions {
125    fn default() -> Self {
126        Self {
127            bind_addr: "127.0.0.1:5055".to_string(),
128            max_body_bytes: 1024 * 1024,
129            read_timeout_ms: 5_000,
130            write_timeout_ms: 5_000,
131            max_scan_limit: 1_000,
132            surface: ServerSurface::Public,
133        }
134    }
135}
136
137/// Replication state exposed to the HTTP server.
138pub struct ServerReplicationState {
139    pub config: crate::replication::ReplicationConfig,
140    pub primary: Option<crate::replication::primary::PrimaryReplication>,
141}
142
143#[derive(Clone)]
144pub struct RedDBServer {
145    runtime: RedDBRuntime,
146    options: ServerOptions,
147    auth_store: Option<Arc<AuthStore>>,
148    replication: Option<Arc<ServerReplicationState>>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum ServerlessWarmupScope {
153    Indexes,
154    GraphProjections,
155    AnalyticsJobs,
156    NativeArtifacts,
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160enum DeploymentProfile {
161    Embedded,
162    Server,
163    Serverless,
164}
165
166#[derive(Debug, Clone)]
167struct ParsedQueryRequest {
168    query: String,
169    entity_types: Option<Vec<String>>,
170    capabilities: Option<Vec<String>>,
171    /// Optional positional `$N` bind parameters (#358). When `Some`, the
172    /// query handler runs the user_params binder before executing.
173    /// Absence preserves the legacy `query`-only behavior.
174    params: Option<Vec<Value>>,
175}
176
177#[derive(Debug, Clone, Copy)]
178enum PatchOperationType {
179    Set,
180    Replace,
181    Unset,
182}
183
184#[derive(Debug, Clone)]
185struct PatchOperation {
186    op: PatchOperationType,
187    path: Vec<String>,
188    value: Option<JsonValue>,
189}
190
191impl RedDBServer {
192    pub fn new(runtime: RedDBRuntime) -> Self {
193        Self::with_options(runtime, ServerOptions::default())
194    }
195
196    pub fn from_database_options(
197        db_options: RedDBOptions,
198        server_options: ServerOptions,
199    ) -> RedDBResult<Self> {
200        let runtime = RedDBRuntime::with_options(db_options)?;
201        Ok(Self::with_options(runtime, server_options))
202    }
203
204    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
205        Self {
206            runtime,
207            options,
208            auth_store: None,
209            replication: None,
210        }
211    }
212
213    /// Attach an `AuthStore` for HTTP-layer authentication.
214    /// Also injects the store into the runtime so that `Value::Secret`
215    /// auto-encrypt/decrypt can reach the vault AES key.
216    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
217        self.runtime.set_auth_store(Arc::clone(&auth_store));
218        self.auth_store = Some(auth_store);
219        self
220    }
221
222    /// Attach replication state for status and snapshot endpoints.
223    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
224        self.replication = Some(state);
225        self
226    }
227
228    pub fn runtime(&self) -> &RedDBRuntime {
229        &self.runtime
230    }
231
232    pub fn options(&self) -> &ServerOptions {
233        &self.options
234    }
235
236    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
237        QueryUseCases::new(&self.runtime)
238    }
239
240    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
241        AdminUseCases::new(&self.runtime)
242    }
243
244    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
245        EntityUseCases::new(&self.runtime)
246    }
247
248    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
249        CatalogUseCases::new(&self.runtime)
250    }
251
252    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
253        GraphUseCases::new(&self.runtime)
254    }
255
256    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
257        NativeUseCases::new(&self.runtime)
258    }
259
260    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
261        TreeUseCases::new(&self.runtime)
262    }
263
264    pub fn serve(&self) -> io::Result<()> {
265        let listener = TcpListener::bind(&self.options.bind_addr)?;
266        self.serve_on(listener)
267    }
268
269    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
270        for stream in listener.incoming() {
271            match stream {
272                Ok(stream) => {
273                    // Spawn a thread per connection for concurrent request handling
274                    let server = self.clone();
275                    thread::spawn(move || {
276                        let _ = server.handle_connection(stream);
277                    });
278                }
279                Err(err) => return Err(err),
280            }
281        }
282        Ok(())
283    }
284
285    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
286        let server = self.clone();
287        thread::spawn(move || server.serve())
288    }
289
290    pub fn serve_in_background_on(
291        &self,
292        listener: TcpListener,
293    ) -> thread::JoinHandle<io::Result<()>> {
294        let server = self.clone();
295        thread::spawn(move || server.serve_on(listener))
296    }
297
298    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
299    /// `tls_config` is shared across all connections (rustls
300    /// `ServerConfig` is `Send + Sync`).
301    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
302        let listener = TcpListener::bind(&self.options.bind_addr)?;
303        self.serve_tls_on(listener, tls_config)
304    }
305
306    pub fn serve_tls_on(
307        &self,
308        listener: TcpListener,
309        tls_config: std::sync::Arc<rustls::ServerConfig>,
310    ) -> io::Result<()> {
311        for stream in listener.incoming() {
312            match stream {
313                Ok(stream) => {
314                    let server = self.clone();
315                    let cfg = tls_config.clone();
316                    thread::spawn(move || {
317                        let _ = server.handle_tls_connection(stream, cfg);
318                    });
319                }
320                Err(err) => return Err(err),
321            }
322        }
323        Ok(())
324    }
325
326    pub fn serve_tls_in_background(
327        &self,
328        tls_config: std::sync::Arc<rustls::ServerConfig>,
329    ) -> thread::JoinHandle<io::Result<()>> {
330        let server = self.clone();
331        thread::spawn(move || server.serve_tls(tls_config))
332    }
333
334    pub fn serve_tls_in_background_on(
335        &self,
336        listener: TcpListener,
337        tls_config: std::sync::Arc<rustls::ServerConfig>,
338    ) -> thread::JoinHandle<io::Result<()>> {
339        let server = self.clone();
340        thread::spawn(move || server.serve_tls_on(listener, tls_config))
341    }
342
343    fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
344        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
345        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
346
347        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
348        let response = self.route(request);
349        stream.write_all(&response.to_http_bytes())?;
350        stream.flush()?;
351        Ok(())
352    }
353
354    fn handle_tls_connection(
355        &self,
356        tcp: TcpStream,
357        tls_config: std::sync::Arc<rustls::ServerConfig>,
358    ) -> io::Result<()> {
359        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
360        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
361        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
362            Ok(s) => s,
363            Err(err) => {
364                tracing::warn!(
365                    target: "reddb::http_tls",
366                    err = %err,
367                    "TLS handshake failed"
368                );
369                return Err(err);
370            }
371        };
372        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
373            Ok(req) => req,
374            Err(err) => {
375                tracing::warn!(
376                    target: "reddb::http_tls",
377                    err = %err,
378                    "TLS request parse failed"
379                );
380                return Err(err);
381            }
382        };
383        let response = self.route(request);
384        tls_stream.write_all(&response.to_http_bytes())?;
385        tls_stream.flush()?;
386        Ok(())
387    }
388}