Skip to main content

reddb_server/
server.rs

1//! Minimal HTTP server for RedDB management and remote access.
2
3pub(crate) use crate::application::json_input::{
4    json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7    AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8    CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9    CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10    ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11    GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12    GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13    GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14    NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15    QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16    SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32    RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33    RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34    RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35    RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36    RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37    RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38    RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39    RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48    crate::presentation::admin_json::analytics_job_json(job)
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::api::RedDBOptions;
55    use crate::health::HealthReport;
56    use crate::service_cli::{
57        TransportListenerFailure, TransportListenerState, TransportReadiness,
58    };
59
60    #[test]
61    fn health_json_reports_transport_listeners() {
62        let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
63        let mut options = ServerOptions::default();
64        options.transport_readiness = TransportReadiness {
65            active: vec![TransportListenerState {
66                transport: "grpc".to_string(),
67                bind_addr: "127.0.0.1:50051".to_string(),
68                explicit: true,
69            }],
70            failed: vec![TransportListenerFailure {
71                transport: "http".to_string(),
72                bind_addr: "127.0.0.1:5055".to_string(),
73                explicit: false,
74                reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
75            }],
76        };
77        let server = RedDBServer::with_options(runtime, options);
78
79        let payload = server.health_json_with_transport(&HealthReport::healthy());
80        let JsonValue::Object(root) = payload else {
81            panic!("health payload should be an object");
82        };
83        let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
84            panic!("health payload should include transport_listeners");
85        };
86        let Some(JsonValue::Array(active)) = listeners.get("active") else {
87            panic!("transport_listeners.active should be an array");
88        };
89        let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
90            panic!("transport_listeners.failed should be an array");
91        };
92
93        assert_eq!(active.len(), 1);
94        assert_eq!(failed.len(), 1);
95    }
96}
97
98fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
99    crate::presentation::admin_json::graph_projection_json(projection)
100}
101
102pub mod handlers_admin;
103mod handlers_ai;
104mod handlers_auth;
105mod handlers_backup;
106mod handlers_ec;
107mod handlers_entity;
108mod handlers_geo;
109mod handlers_graph;
110mod handlers_keyed;
111mod handlers_log;
112mod handlers_metrics;
113mod handlers_ops;
114mod handlers_query;
115mod handlers_replication;
116mod handlers_vcs;
117mod handlers_vector;
118pub mod header_escape_guard;
119pub mod ingest_pipeline;
120mod patch_support;
121mod request_body;
122mod request_context;
123mod routing;
124mod serverless_support;
125pub mod tls;
126mod transport;
127
128use self::handlers_ai::*;
129use self::handlers_entity::*;
130use self::handlers_graph::*;
131use self::handlers_keyed::*;
132use self::handlers_metrics::*;
133use self::handlers_ops::*;
134use self::handlers_query::*;
135use self::patch_support::*;
136use self::request_body::*;
137use self::routing::*;
138use self::serverless_support::*;
139use self::transport::*;
140
141/// PLAN.md Phase 6.2 — endpoint segregation. A given HTTP listener
142/// can serve either every public surface (`Public`, default) or a
143/// restricted slice (`AdminOnly`, `MetricsOnly`). The route filter at
144/// the top of `route()` consults this so a port bound only to
145/// loopback for admin work won't accidentally hand out DML.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum ServerSurface {
148    /// Everything routed normally (default — matches v0 behaviour).
149    Public,
150    /// Only `/admin/*`, `/metrics`, and `/health/*`. Other paths
151    /// return 404. Intended for `RED_ADMIN_BIND` operator listeners
152    /// which default to `127.0.0.1`.
153    AdminOnly,
154    /// Only `/metrics` and `/health/*`. Intended for
155    /// `RED_METRICS_BIND` Prometheus scrape ports that may be
156    /// exposed to non-admin networks.
157    MetricsOnly,
158}
159
160#[derive(Debug, Clone)]
161pub struct ServerOptions {
162    pub bind_addr: String,
163    pub max_body_bytes: usize,
164    pub read_timeout_ms: u64,
165    pub write_timeout_ms: u64,
166    pub max_scan_limit: usize,
167    /// Which subset of paths this listener serves. Defaults to
168    /// `Public`. Set to `AdminOnly` / `MetricsOnly` for dedicated
169    /// admin / scrape ports (PLAN.md Phase 6.2).
170    pub surface: ServerSurface,
171    pub transport_readiness: crate::service_cli::TransportReadiness,
172}
173
174impl Default for ServerOptions {
175    fn default() -> Self {
176        Self {
177            bind_addr: "127.0.0.1:5055".to_string(),
178            max_body_bytes: 1024 * 1024,
179            read_timeout_ms: 5_000,
180            write_timeout_ms: 5_000,
181            max_scan_limit: 1_000,
182            surface: ServerSurface::Public,
183            transport_readiness: crate::service_cli::TransportReadiness::default(),
184        }
185    }
186}
187
188/// Replication state exposed to the HTTP server.
189pub struct ServerReplicationState {
190    pub config: crate::replication::ReplicationConfig,
191    pub primary: Option<crate::replication::primary::PrimaryReplication>,
192}
193
194#[derive(Clone)]
195pub struct RedDBServer {
196    runtime: RedDBRuntime,
197    options: ServerOptions,
198    auth_store: Option<Arc<AuthStore>>,
199    replication: Option<Arc<ServerReplicationState>>,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203enum ServerlessWarmupScope {
204    Indexes,
205    GraphProjections,
206    AnalyticsJobs,
207    NativeArtifacts,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211enum DeploymentProfile {
212    Embedded,
213    Server,
214    Serverless,
215}
216
217fn percent_decode_path_segment(input: &str) -> Result<String, String> {
218    let bytes = input.as_bytes();
219    let mut out = Vec::with_capacity(bytes.len());
220    let mut index = 0;
221    while index < bytes.len() {
222        match bytes[index] {
223            b'%' => {
224                if index + 2 >= bytes.len() {
225                    return Err("truncated percent escape".to_string());
226                }
227                let high = hex_value(bytes[index + 1])
228                    .ok_or_else(|| "invalid percent escape".to_string())?;
229                let low = hex_value(bytes[index + 2])
230                    .ok_or_else(|| "invalid percent escape".to_string())?;
231                out.push((high << 4) | low);
232                index += 3;
233            }
234            byte => {
235                out.push(byte);
236                index += 1;
237            }
238        }
239    }
240    String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
241}
242
243fn hex_value(byte: u8) -> Option<u8> {
244    match byte {
245        b'0'..=b'9' => Some(byte - b'0'),
246        b'a'..=b'f' => Some(byte - b'a' + 10),
247        b'A'..=b'F' => Some(byte - b'A' + 10),
248        _ => None,
249    }
250}
251
252#[derive(Debug, Clone)]
253struct ParsedQueryRequest {
254    query: String,
255    entity_types: Option<Vec<String>>,
256    capabilities: Option<Vec<String>>,
257    /// Optional positional `$N` bind parameters (#358). When `Some`, the
258    /// query handler runs the user_params binder before executing.
259    /// Absence preserves the legacy `query`-only behavior.
260    params: Option<Vec<Value>>,
261}
262
263#[derive(Debug, Clone, Copy)]
264enum PatchOperationType {
265    Set,
266    Replace,
267    Unset,
268}
269
270#[derive(Debug, Clone)]
271struct PatchOperation {
272    op: PatchOperationType,
273    path: Vec<String>,
274    value: Option<JsonValue>,
275}
276
277impl RedDBServer {
278    pub fn new(runtime: RedDBRuntime) -> Self {
279        Self::with_options(runtime, ServerOptions::default())
280    }
281
282    pub fn from_database_options(
283        db_options: RedDBOptions,
284        server_options: ServerOptions,
285    ) -> RedDBResult<Self> {
286        let runtime = RedDBRuntime::with_options(db_options)?;
287        Ok(Self::with_options(runtime, server_options))
288    }
289
290    pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
291        Self {
292            runtime,
293            options,
294            auth_store: None,
295            replication: None,
296        }
297    }
298
299    /// Attach an `AuthStore` for HTTP-layer authentication.
300    /// Also injects the store into the runtime so that `Value::Secret`
301    /// auto-encrypt/decrypt can reach the vault AES key.
302    pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
303        self.runtime.set_auth_store(Arc::clone(&auth_store));
304        self.auth_store = Some(auth_store);
305        self
306    }
307
308    /// Attach replication state for status and snapshot endpoints.
309    pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
310        self.replication = Some(state);
311        self
312    }
313
314    pub fn runtime(&self) -> &RedDBRuntime {
315        &self.runtime
316    }
317
318    pub fn options(&self) -> &ServerOptions {
319        &self.options
320    }
321
322    fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
323        QueryUseCases::new(&self.runtime)
324    }
325
326    fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
327        AdminUseCases::new(&self.runtime)
328    }
329
330    fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
331        EntityUseCases::new(&self.runtime)
332    }
333
334    fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
335        CatalogUseCases::new(&self.runtime)
336    }
337
338    fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
339        GraphUseCases::new(&self.runtime)
340    }
341
342    fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
343        NativeUseCases::new(&self.runtime)
344    }
345
346    fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
347        TreeUseCases::new(&self.runtime)
348    }
349
350    fn transport_readiness_json(&self) -> JsonValue {
351        let active = self
352            .options
353            .transport_readiness
354            .active
355            .iter()
356            .map(|listener| {
357                let mut object = Map::new();
358                object.insert(
359                    "transport".to_string(),
360                    JsonValue::String(listener.transport.clone()),
361                );
362                object.insert(
363                    "bind_addr".to_string(),
364                    JsonValue::String(listener.bind_addr.clone()),
365                );
366                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
367                JsonValue::Object(object)
368            })
369            .collect();
370        let failed = self
371            .options
372            .transport_readiness
373            .failed
374            .iter()
375            .map(|listener| {
376                let mut object = Map::new();
377                object.insert(
378                    "transport".to_string(),
379                    JsonValue::String(listener.transport.clone()),
380                );
381                object.insert(
382                    "bind_addr".to_string(),
383                    JsonValue::String(listener.bind_addr.clone()),
384                );
385                object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
386                object.insert(
387                    "reason".to_string(),
388                    JsonValue::String(listener.reason.clone()),
389                );
390                JsonValue::Object(object)
391            })
392            .collect();
393
394        let mut object = Map::new();
395        object.insert("active".to_string(), JsonValue::Array(active));
396        object.insert("failed".to_string(), JsonValue::Array(failed));
397        JsonValue::Object(object)
398    }
399
400    fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
401        let mut value = crate::presentation::ops_json::health_json(report);
402        if let JsonValue::Object(ref mut object) = value {
403            object.insert(
404                "transport_listeners".to_string(),
405                self.transport_readiness_json(),
406            );
407        }
408        value
409    }
410
411    pub fn serve(&self) -> io::Result<()> {
412        let listener = TcpListener::bind(&self.options.bind_addr)?;
413        self.serve_on(listener)
414    }
415
416    pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
417        for stream in listener.incoming() {
418            match stream {
419                Ok(stream) => {
420                    // Spawn a thread per connection for concurrent request handling
421                    let server = self.clone();
422                    thread::spawn(move || {
423                        let _ = server.handle_connection(stream);
424                    });
425                }
426                Err(err) => return Err(err),
427            }
428        }
429        Ok(())
430    }
431
432    pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
433        let (stream, _) = listener.accept()?;
434        self.handle_connection(stream)
435    }
436
437    pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
438        let server = self.clone();
439        thread::spawn(move || server.serve())
440    }
441
442    pub fn serve_in_background_on(
443        &self,
444        listener: TcpListener,
445    ) -> thread::JoinHandle<io::Result<()>> {
446        let server = self.clone();
447        thread::spawn(move || server.serve_on(listener))
448    }
449
450    /// Serve TLS-wrapped HTTPS on the configured `bind_addr`. The
451    /// `tls_config` is shared across all connections (rustls
452    /// `ServerConfig` is `Send + Sync`).
453    pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
454        let listener = TcpListener::bind(&self.options.bind_addr)?;
455        self.serve_tls_on(listener, tls_config)
456    }
457
458    pub fn serve_tls_on(
459        &self,
460        listener: TcpListener,
461        tls_config: std::sync::Arc<rustls::ServerConfig>,
462    ) -> io::Result<()> {
463        for stream in listener.incoming() {
464            match stream {
465                Ok(stream) => {
466                    let server = self.clone();
467                    let cfg = tls_config.clone();
468                    thread::spawn(move || {
469                        let _ = server.handle_tls_connection(stream, cfg);
470                    });
471                }
472                Err(err) => return Err(err),
473            }
474        }
475        Ok(())
476    }
477
478    pub fn serve_tls_in_background(
479        &self,
480        tls_config: std::sync::Arc<rustls::ServerConfig>,
481    ) -> thread::JoinHandle<io::Result<()>> {
482        let server = self.clone();
483        thread::spawn(move || server.serve_tls(tls_config))
484    }
485
486    pub fn serve_tls_in_background_on(
487        &self,
488        listener: TcpListener,
489        tls_config: std::sync::Arc<rustls::ServerConfig>,
490    ) -> thread::JoinHandle<io::Result<()>> {
491        let server = self.clone();
492        thread::spawn(move || server.serve_tls_on(listener, tls_config))
493    }
494
495    fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
496        stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
497        stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
498
499        let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
500        if self.try_route_streaming(&request, &mut stream)? {
501            return Ok(());
502        }
503        let response = self.route(request);
504        stream.write_all(&response.to_http_bytes())?;
505        stream.flush()?;
506        Ok(())
507    }
508
509    fn handle_tls_connection(
510        &self,
511        tcp: TcpStream,
512        tls_config: std::sync::Arc<rustls::ServerConfig>,
513    ) -> io::Result<()> {
514        tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
515        tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
516        let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
517            Ok(s) => s,
518            Err(err) => {
519                tracing::warn!(
520                    target: "reddb::http_tls",
521                    err = %err,
522                    "TLS handshake failed"
523                );
524                return Err(err);
525            }
526        };
527        let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
528            Ok(req) => req,
529            Err(err) => {
530                tracing::warn!(
531                    target: "reddb::http_tls",
532                    err = %err,
533                    "TLS request parse failed"
534                );
535                return Err(err);
536            }
537        };
538        if self.try_route_streaming(&request, &mut tls_stream)? {
539            return Ok(());
540        }
541        let response = self.route(request);
542        tls_stream.write_all(&response.to_http_bytes())?;
543        tls_stream.flush()?;
544        Ok(())
545    }
546}