1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48 crate::presentation::admin_json::analytics_job_json(job)
49}
50
51fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
52 crate::presentation::admin_json::graph_projection_json(projection)
53}
54
55pub mod handlers_admin;
56mod handlers_ai;
57mod handlers_auth;
58mod handlers_backup;
59mod handlers_ec;
60mod handlers_entity;
61mod handlers_geo;
62mod handlers_graph;
63mod handlers_keyed;
64mod handlers_log;
65mod handlers_ops;
66mod handlers_query;
67mod handlers_replication;
68mod handlers_vcs;
69mod handlers_vector;
70pub mod header_escape_guard;
71pub mod ingest_pipeline;
72mod patch_support;
73mod request_body;
74mod request_context;
75mod routing;
76mod serverless_support;
77pub mod tls;
78mod transport;
79
80use self::handlers_ai::*;
81use self::handlers_entity::*;
82use self::handlers_graph::*;
83use self::handlers_keyed::*;
84use self::handlers_ops::*;
85use self::handlers_query::*;
86use self::patch_support::*;
87use self::request_body::*;
88use self::routing::*;
89use self::serverless_support::*;
90use self::transport::*;
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ServerSurface {
99 Public,
101 AdminOnly,
105 MetricsOnly,
109}
110
111#[derive(Debug, Clone)]
112pub struct ServerOptions {
113 pub bind_addr: String,
114 pub max_body_bytes: usize,
115 pub read_timeout_ms: u64,
116 pub write_timeout_ms: u64,
117 pub max_scan_limit: usize,
118 pub surface: ServerSurface,
122}
123
124impl Default for ServerOptions {
125 fn default() -> Self {
126 Self {
127 bind_addr: "127.0.0.1:5055".to_string(),
128 max_body_bytes: 1024 * 1024,
129 read_timeout_ms: 5_000,
130 write_timeout_ms: 5_000,
131 max_scan_limit: 1_000,
132 surface: ServerSurface::Public,
133 }
134 }
135}
136
137pub struct ServerReplicationState {
139 pub config: crate::replication::ReplicationConfig,
140 pub primary: Option<crate::replication::primary::PrimaryReplication>,
141}
142
143#[derive(Clone)]
144pub struct RedDBServer {
145 runtime: RedDBRuntime,
146 options: ServerOptions,
147 auth_store: Option<Arc<AuthStore>>,
148 replication: Option<Arc<ServerReplicationState>>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum ServerlessWarmupScope {
153 Indexes,
154 GraphProjections,
155 AnalyticsJobs,
156 NativeArtifacts,
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160enum DeploymentProfile {
161 Embedded,
162 Server,
163 Serverless,
164}
165
166#[derive(Debug, Clone)]
167struct ParsedQueryRequest {
168 query: String,
169 entity_types: Option<Vec<String>>,
170 capabilities: Option<Vec<String>>,
171 params: Option<Vec<Value>>,
175}
176
177#[derive(Debug, Clone, Copy)]
178enum PatchOperationType {
179 Set,
180 Replace,
181 Unset,
182}
183
184#[derive(Debug, Clone)]
185struct PatchOperation {
186 op: PatchOperationType,
187 path: Vec<String>,
188 value: Option<JsonValue>,
189}
190
191impl RedDBServer {
192 pub fn new(runtime: RedDBRuntime) -> Self {
193 Self::with_options(runtime, ServerOptions::default())
194 }
195
196 pub fn from_database_options(
197 db_options: RedDBOptions,
198 server_options: ServerOptions,
199 ) -> RedDBResult<Self> {
200 let runtime = RedDBRuntime::with_options(db_options)?;
201 Ok(Self::with_options(runtime, server_options))
202 }
203
204 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
205 Self {
206 runtime,
207 options,
208 auth_store: None,
209 replication: None,
210 }
211 }
212
213 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
217 self.runtime.set_auth_store(Arc::clone(&auth_store));
218 self.auth_store = Some(auth_store);
219 self
220 }
221
222 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
224 self.replication = Some(state);
225 self
226 }
227
228 pub fn runtime(&self) -> &RedDBRuntime {
229 &self.runtime
230 }
231
232 pub fn options(&self) -> &ServerOptions {
233 &self.options
234 }
235
236 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
237 QueryUseCases::new(&self.runtime)
238 }
239
240 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
241 AdminUseCases::new(&self.runtime)
242 }
243
244 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
245 EntityUseCases::new(&self.runtime)
246 }
247
248 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
249 CatalogUseCases::new(&self.runtime)
250 }
251
252 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
253 GraphUseCases::new(&self.runtime)
254 }
255
256 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
257 NativeUseCases::new(&self.runtime)
258 }
259
260 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
261 TreeUseCases::new(&self.runtime)
262 }
263
264 pub fn serve(&self) -> io::Result<()> {
265 let listener = TcpListener::bind(&self.options.bind_addr)?;
266 self.serve_on(listener)
267 }
268
269 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
270 for stream in listener.incoming() {
271 match stream {
272 Ok(stream) => {
273 let server = self.clone();
275 thread::spawn(move || {
276 let _ = server.handle_connection(stream);
277 });
278 }
279 Err(err) => return Err(err),
280 }
281 }
282 Ok(())
283 }
284
285 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
286 let server = self.clone();
287 thread::spawn(move || server.serve())
288 }
289
290 pub fn serve_in_background_on(
291 &self,
292 listener: TcpListener,
293 ) -> thread::JoinHandle<io::Result<()>> {
294 let server = self.clone();
295 thread::spawn(move || server.serve_on(listener))
296 }
297
298 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
302 let listener = TcpListener::bind(&self.options.bind_addr)?;
303 self.serve_tls_on(listener, tls_config)
304 }
305
306 pub fn serve_tls_on(
307 &self,
308 listener: TcpListener,
309 tls_config: std::sync::Arc<rustls::ServerConfig>,
310 ) -> io::Result<()> {
311 for stream in listener.incoming() {
312 match stream {
313 Ok(stream) => {
314 let server = self.clone();
315 let cfg = tls_config.clone();
316 thread::spawn(move || {
317 let _ = server.handle_tls_connection(stream, cfg);
318 });
319 }
320 Err(err) => return Err(err),
321 }
322 }
323 Ok(())
324 }
325
326 pub fn serve_tls_in_background(
327 &self,
328 tls_config: std::sync::Arc<rustls::ServerConfig>,
329 ) -> thread::JoinHandle<io::Result<()>> {
330 let server = self.clone();
331 thread::spawn(move || server.serve_tls(tls_config))
332 }
333
334 pub fn serve_tls_in_background_on(
335 &self,
336 listener: TcpListener,
337 tls_config: std::sync::Arc<rustls::ServerConfig>,
338 ) -> thread::JoinHandle<io::Result<()>> {
339 let server = self.clone();
340 thread::spawn(move || server.serve_tls_on(listener, tls_config))
341 }
342
343 fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
344 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
345 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
346
347 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
348 if self.try_route_streaming(&request, &mut stream)? {
349 return Ok(());
350 }
351 let response = self.route(request);
352 stream.write_all(&response.to_http_bytes())?;
353 stream.flush()?;
354 Ok(())
355 }
356
357 fn handle_tls_connection(
358 &self,
359 tcp: TcpStream,
360 tls_config: std::sync::Arc<rustls::ServerConfig>,
361 ) -> io::Result<()> {
362 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
363 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
364 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
365 Ok(s) => s,
366 Err(err) => {
367 tracing::warn!(
368 target: "reddb::http_tls",
369 err = %err,
370 "TLS handshake failed"
371 );
372 return Err(err);
373 }
374 };
375 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
376 Ok(req) => req,
377 Err(err) => {
378 tracing::warn!(
379 target: "reddb::http_tls",
380 err = %err,
381 "TLS request parse failed"
382 );
383 return Err(err);
384 }
385 };
386 if self.try_route_streaming(&request, &mut tls_stream)? {
387 return Ok(());
388 }
389 let response = self.route(request);
390 tls_stream.write_all(&response.to_http_bytes())?;
391 tls_stream.flush()?;
392 Ok(())
393 }
394}