1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48 crate::presentation::admin_json::analytics_job_json(job)
49}
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54 use crate::api::RedDBOptions;
55 use crate::health::HealthReport;
56 use crate::service_cli::{
57 TransportListenerFailure, TransportListenerState, TransportReadiness,
58 };
59
60 #[test]
61 fn health_json_reports_transport_listeners() {
62 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
63 let mut options = ServerOptions::default();
64 options.transport_readiness = TransportReadiness {
65 active: vec![TransportListenerState {
66 transport: "grpc".to_string(),
67 bind_addr: "127.0.0.1:50051".to_string(),
68 explicit: true,
69 }],
70 failed: vec![TransportListenerFailure {
71 transport: "http".to_string(),
72 bind_addr: "127.0.0.1:5055".to_string(),
73 explicit: false,
74 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
75 }],
76 };
77 let server = RedDBServer::with_options(runtime, options);
78
79 let payload = server.health_json_with_transport(&HealthReport::healthy());
80 let JsonValue::Object(root) = payload else {
81 panic!("health payload should be an object");
82 };
83 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
84 panic!("health payload should include transport_listeners");
85 };
86 let Some(JsonValue::Array(active)) = listeners.get("active") else {
87 panic!("transport_listeners.active should be an array");
88 };
89 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
90 panic!("transport_listeners.failed should be an array");
91 };
92
93 assert_eq!(active.len(), 1);
94 assert_eq!(failed.len(), 1);
95 }
96}
97
98fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
99 crate::presentation::admin_json::graph_projection_json(projection)
100}
101
102pub mod handlers_admin;
103mod handlers_ai;
104mod handlers_auth;
105mod handlers_backup;
106mod handlers_ec;
107mod handlers_entity;
108mod handlers_geo;
109mod handlers_graph;
110mod handlers_keyed;
111mod handlers_log;
112mod handlers_metrics;
113mod handlers_ops;
114mod handlers_query;
115mod handlers_replication;
116mod handlers_vcs;
117mod handlers_vector;
118pub mod header_escape_guard;
119pub mod ingest_pipeline;
120mod patch_support;
121mod request_body;
122mod request_context;
123mod routing;
124mod serverless_support;
125pub mod tls;
126mod transport;
127
128use self::handlers_ai::*;
129use self::handlers_entity::*;
130use self::handlers_graph::*;
131use self::handlers_keyed::*;
132use self::handlers_metrics::*;
133use self::handlers_ops::*;
134use self::handlers_query::*;
135use self::patch_support::*;
136use self::request_body::*;
137use self::routing::*;
138use self::serverless_support::*;
139use self::transport::*;
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum ServerSurface {
148 Public,
150 AdminOnly,
154 MetricsOnly,
158}
159
160#[derive(Debug, Clone)]
161pub struct ServerOptions {
162 pub bind_addr: String,
163 pub max_body_bytes: usize,
164 pub read_timeout_ms: u64,
165 pub write_timeout_ms: u64,
166 pub max_scan_limit: usize,
167 pub surface: ServerSurface,
171 pub transport_readiness: crate::service_cli::TransportReadiness,
172}
173
174impl Default for ServerOptions {
175 fn default() -> Self {
176 Self {
177 bind_addr: "127.0.0.1:5055".to_string(),
178 max_body_bytes: 1024 * 1024,
179 read_timeout_ms: 5_000,
180 write_timeout_ms: 5_000,
181 max_scan_limit: 1_000,
182 surface: ServerSurface::Public,
183 transport_readiness: crate::service_cli::TransportReadiness::default(),
184 }
185 }
186}
187
188pub struct ServerReplicationState {
190 pub config: crate::replication::ReplicationConfig,
191 pub primary: Option<crate::replication::primary::PrimaryReplication>,
192}
193
194#[derive(Clone)]
195pub struct RedDBServer {
196 runtime: RedDBRuntime,
197 options: ServerOptions,
198 auth_store: Option<Arc<AuthStore>>,
199 replication: Option<Arc<ServerReplicationState>>,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203enum ServerlessWarmupScope {
204 Indexes,
205 GraphProjections,
206 AnalyticsJobs,
207 NativeArtifacts,
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211enum DeploymentProfile {
212 Embedded,
213 Server,
214 Serverless,
215}
216
217fn percent_decode_path_segment(input: &str) -> Result<String, String> {
218 let bytes = input.as_bytes();
219 let mut out = Vec::with_capacity(bytes.len());
220 let mut index = 0;
221 while index < bytes.len() {
222 match bytes[index] {
223 b'%' => {
224 if index + 2 >= bytes.len() {
225 return Err("truncated percent escape".to_string());
226 }
227 let high = hex_value(bytes[index + 1])
228 .ok_or_else(|| "invalid percent escape".to_string())?;
229 let low = hex_value(bytes[index + 2])
230 .ok_or_else(|| "invalid percent escape".to_string())?;
231 out.push((high << 4) | low);
232 index += 3;
233 }
234 byte => {
235 out.push(byte);
236 index += 1;
237 }
238 }
239 }
240 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
241}
242
243fn hex_value(byte: u8) -> Option<u8> {
244 match byte {
245 b'0'..=b'9' => Some(byte - b'0'),
246 b'a'..=b'f' => Some(byte - b'a' + 10),
247 b'A'..=b'F' => Some(byte - b'A' + 10),
248 _ => None,
249 }
250}
251
252#[derive(Debug, Clone)]
253struct ParsedQueryRequest {
254 query: String,
255 entity_types: Option<Vec<String>>,
256 capabilities: Option<Vec<String>>,
257 params: Option<Vec<Value>>,
261}
262
263#[derive(Debug, Clone, Copy)]
264enum PatchOperationType {
265 Set,
266 Replace,
267 Unset,
268}
269
270#[derive(Debug, Clone)]
271struct PatchOperation {
272 op: PatchOperationType,
273 path: Vec<String>,
274 value: Option<JsonValue>,
275}
276
277impl RedDBServer {
278 pub fn new(runtime: RedDBRuntime) -> Self {
279 Self::with_options(runtime, ServerOptions::default())
280 }
281
282 pub fn from_database_options(
283 db_options: RedDBOptions,
284 server_options: ServerOptions,
285 ) -> RedDBResult<Self> {
286 let runtime = RedDBRuntime::with_options(db_options)?;
287 Ok(Self::with_options(runtime, server_options))
288 }
289
290 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
291 Self {
292 runtime,
293 options,
294 auth_store: None,
295 replication: None,
296 }
297 }
298
299 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
303 self.runtime.set_auth_store(Arc::clone(&auth_store));
304 self.auth_store = Some(auth_store);
305 self
306 }
307
308 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
310 self.replication = Some(state);
311 self
312 }
313
314 pub fn runtime(&self) -> &RedDBRuntime {
315 &self.runtime
316 }
317
318 pub fn options(&self) -> &ServerOptions {
319 &self.options
320 }
321
322 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
323 QueryUseCases::new(&self.runtime)
324 }
325
326 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
327 AdminUseCases::new(&self.runtime)
328 }
329
330 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
331 EntityUseCases::new(&self.runtime)
332 }
333
334 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
335 CatalogUseCases::new(&self.runtime)
336 }
337
338 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
339 GraphUseCases::new(&self.runtime)
340 }
341
342 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
343 NativeUseCases::new(&self.runtime)
344 }
345
346 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
347 TreeUseCases::new(&self.runtime)
348 }
349
350 fn transport_readiness_json(&self) -> JsonValue {
351 let active = self
352 .options
353 .transport_readiness
354 .active
355 .iter()
356 .map(|listener| {
357 let mut object = Map::new();
358 object.insert(
359 "transport".to_string(),
360 JsonValue::String(listener.transport.clone()),
361 );
362 object.insert(
363 "bind_addr".to_string(),
364 JsonValue::String(listener.bind_addr.clone()),
365 );
366 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
367 JsonValue::Object(object)
368 })
369 .collect();
370 let failed = self
371 .options
372 .transport_readiness
373 .failed
374 .iter()
375 .map(|listener| {
376 let mut object = Map::new();
377 object.insert(
378 "transport".to_string(),
379 JsonValue::String(listener.transport.clone()),
380 );
381 object.insert(
382 "bind_addr".to_string(),
383 JsonValue::String(listener.bind_addr.clone()),
384 );
385 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
386 object.insert(
387 "reason".to_string(),
388 JsonValue::String(listener.reason.clone()),
389 );
390 JsonValue::Object(object)
391 })
392 .collect();
393
394 let mut object = Map::new();
395 object.insert("active".to_string(), JsonValue::Array(active));
396 object.insert("failed".to_string(), JsonValue::Array(failed));
397 JsonValue::Object(object)
398 }
399
400 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
401 let mut value = crate::presentation::ops_json::health_json(report);
402 if let JsonValue::Object(ref mut object) = value {
403 object.insert(
404 "transport_listeners".to_string(),
405 self.transport_readiness_json(),
406 );
407 }
408 value
409 }
410
411 pub fn serve(&self) -> io::Result<()> {
412 let listener = TcpListener::bind(&self.options.bind_addr)?;
413 self.serve_on(listener)
414 }
415
416 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
417 for stream in listener.incoming() {
418 match stream {
419 Ok(stream) => {
420 let server = self.clone();
422 thread::spawn(move || {
423 let _ = server.handle_connection(stream);
424 });
425 }
426 Err(err) => return Err(err),
427 }
428 }
429 Ok(())
430 }
431
432 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
433 let (stream, _) = listener.accept()?;
434 self.handle_connection(stream)
435 }
436
437 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
438 let server = self.clone();
439 thread::spawn(move || server.serve())
440 }
441
442 pub fn serve_in_background_on(
443 &self,
444 listener: TcpListener,
445 ) -> thread::JoinHandle<io::Result<()>> {
446 let server = self.clone();
447 thread::spawn(move || server.serve_on(listener))
448 }
449
450 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
454 let listener = TcpListener::bind(&self.options.bind_addr)?;
455 self.serve_tls_on(listener, tls_config)
456 }
457
458 pub fn serve_tls_on(
459 &self,
460 listener: TcpListener,
461 tls_config: std::sync::Arc<rustls::ServerConfig>,
462 ) -> io::Result<()> {
463 for stream in listener.incoming() {
464 match stream {
465 Ok(stream) => {
466 let server = self.clone();
467 let cfg = tls_config.clone();
468 thread::spawn(move || {
469 let _ = server.handle_tls_connection(stream, cfg);
470 });
471 }
472 Err(err) => return Err(err),
473 }
474 }
475 Ok(())
476 }
477
478 pub fn serve_tls_in_background(
479 &self,
480 tls_config: std::sync::Arc<rustls::ServerConfig>,
481 ) -> thread::JoinHandle<io::Result<()>> {
482 let server = self.clone();
483 thread::spawn(move || server.serve_tls(tls_config))
484 }
485
486 pub fn serve_tls_in_background_on(
487 &self,
488 listener: TcpListener,
489 tls_config: std::sync::Arc<rustls::ServerConfig>,
490 ) -> thread::JoinHandle<io::Result<()>> {
491 let server = self.clone();
492 thread::spawn(move || server.serve_tls_on(listener, tls_config))
493 }
494
495 fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
496 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
497 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
498
499 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
500 if self.try_route_streaming(&request, &mut stream)? {
501 return Ok(());
502 }
503 let response = self.route(request);
504 stream.write_all(&response.to_http_bytes())?;
505 stream.flush()?;
506 Ok(())
507 }
508
509 fn handle_tls_connection(
510 &self,
511 tcp: TcpStream,
512 tls_config: std::sync::Arc<rustls::ServerConfig>,
513 ) -> io::Result<()> {
514 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
515 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
516 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
517 Ok(s) => s,
518 Err(err) => {
519 tracing::warn!(
520 target: "reddb::http_tls",
521 err = %err,
522 "TLS handshake failed"
523 );
524 return Err(err);
525 }
526 };
527 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
528 Ok(req) => req,
529 Err(err) => {
530 tracing::warn!(
531 target: "reddb::http_tls",
532 err = %err,
533 "TLS request parse failed"
534 );
535 return Err(err);
536 }
537 };
538 if self.try_route_streaming(&request, &mut tls_stream)? {
539 return Ok(());
540 }
541 let response = self.route(request);
542 tls_stream.write_all(&response.to_http_bytes())?;
543 tls_stream.flush()?;
544 Ok(())
545 }
546}