1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::thread;
22use std::time::{Duration, SystemTime, UNIX_EPOCH};
23
24use std::sync::Arc;
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::auth::store::AuthStore;
28use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
29use crate::health::{HealthProvider, HealthReport, HealthState};
30use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
31use crate::runtime::{
32 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
33 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
34 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
35 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
36 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
37 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
38 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
39 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
40};
41use crate::storage::schema::Value;
42use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
43use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
44use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
45use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
46
47fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
48 crate::presentation::admin_json::analytics_job_json(job)
49}
50
51fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
52 crate::presentation::admin_json::graph_projection_json(projection)
53}
54
55pub mod handlers_admin;
56mod handlers_ai;
57mod handlers_auth;
58mod handlers_backup;
59mod handlers_ec;
60mod handlers_entity;
61mod handlers_geo;
62mod handlers_graph;
63mod handlers_keyed;
64mod handlers_log;
65mod handlers_ops;
66mod handlers_query;
67mod handlers_replication;
68mod handlers_vcs;
69mod handlers_vector;
70pub mod header_escape_guard;
71pub mod ingest_pipeline;
72mod patch_support;
73mod request_body;
74mod request_context;
75mod routing;
76mod serverless_support;
77pub mod tls;
78mod transport;
79
80use self::handlers_ai::*;
81use self::handlers_entity::*;
82use self::handlers_graph::*;
83use self::handlers_keyed::*;
84use self::handlers_ops::*;
85use self::handlers_query::*;
86use self::patch_support::*;
87use self::request_body::*;
88use self::routing::*;
89use self::serverless_support::*;
90use self::transport::*;
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ServerSurface {
99 Public,
101 AdminOnly,
105 MetricsOnly,
109}
110
111#[derive(Debug, Clone)]
112pub struct ServerOptions {
113 pub bind_addr: String,
114 pub max_body_bytes: usize,
115 pub read_timeout_ms: u64,
116 pub write_timeout_ms: u64,
117 pub max_scan_limit: usize,
118 pub surface: ServerSurface,
122}
123
124impl Default for ServerOptions {
125 fn default() -> Self {
126 Self {
127 bind_addr: "127.0.0.1:5055".to_string(),
128 max_body_bytes: 1024 * 1024,
129 read_timeout_ms: 5_000,
130 write_timeout_ms: 5_000,
131 max_scan_limit: 1_000,
132 surface: ServerSurface::Public,
133 }
134 }
135}
136
137pub struct ServerReplicationState {
139 pub config: crate::replication::ReplicationConfig,
140 pub primary: Option<crate::replication::primary::PrimaryReplication>,
141}
142
143#[derive(Clone)]
144pub struct RedDBServer {
145 runtime: RedDBRuntime,
146 options: ServerOptions,
147 auth_store: Option<Arc<AuthStore>>,
148 replication: Option<Arc<ServerReplicationState>>,
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152enum ServerlessWarmupScope {
153 Indexes,
154 GraphProjections,
155 AnalyticsJobs,
156 NativeArtifacts,
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160enum DeploymentProfile {
161 Embedded,
162 Server,
163 Serverless,
164}
165
166#[derive(Debug, Clone)]
167struct ParsedQueryRequest {
168 query: String,
169 entity_types: Option<Vec<String>>,
170 capabilities: Option<Vec<String>>,
171}
172
173#[derive(Debug, Clone, Copy)]
174enum PatchOperationType {
175 Set,
176 Replace,
177 Unset,
178}
179
180#[derive(Debug, Clone)]
181struct PatchOperation {
182 op: PatchOperationType,
183 path: Vec<String>,
184 value: Option<JsonValue>,
185}
186
187impl RedDBServer {
188 pub fn new(runtime: RedDBRuntime) -> Self {
189 Self::with_options(runtime, ServerOptions::default())
190 }
191
192 pub fn from_database_options(
193 db_options: RedDBOptions,
194 server_options: ServerOptions,
195 ) -> RedDBResult<Self> {
196 let runtime = RedDBRuntime::with_options(db_options)?;
197 Ok(Self::with_options(runtime, server_options))
198 }
199
200 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
201 Self {
202 runtime,
203 options,
204 auth_store: None,
205 replication: None,
206 }
207 }
208
209 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
213 self.runtime.set_auth_store(Arc::clone(&auth_store));
214 self.auth_store = Some(auth_store);
215 self
216 }
217
218 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
220 self.replication = Some(state);
221 self
222 }
223
224 pub fn runtime(&self) -> &RedDBRuntime {
225 &self.runtime
226 }
227
228 pub fn options(&self) -> &ServerOptions {
229 &self.options
230 }
231
232 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
233 QueryUseCases::new(&self.runtime)
234 }
235
236 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
237 AdminUseCases::new(&self.runtime)
238 }
239
240 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
241 EntityUseCases::new(&self.runtime)
242 }
243
244 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
245 CatalogUseCases::new(&self.runtime)
246 }
247
248 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
249 GraphUseCases::new(&self.runtime)
250 }
251
252 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
253 NativeUseCases::new(&self.runtime)
254 }
255
256 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
257 TreeUseCases::new(&self.runtime)
258 }
259
260 pub fn serve(&self) -> io::Result<()> {
261 let listener = TcpListener::bind(&self.options.bind_addr)?;
262 self.serve_on(listener)
263 }
264
265 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
266 for stream in listener.incoming() {
267 match stream {
268 Ok(stream) => {
269 let server = self.clone();
271 thread::spawn(move || {
272 let _ = server.handle_connection(stream);
273 });
274 }
275 Err(err) => return Err(err),
276 }
277 }
278 Ok(())
279 }
280
281 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
282 let server = self.clone();
283 thread::spawn(move || server.serve())
284 }
285
286 pub fn serve_in_background_on(
287 &self,
288 listener: TcpListener,
289 ) -> thread::JoinHandle<io::Result<()>> {
290 let server = self.clone();
291 thread::spawn(move || server.serve_on(listener))
292 }
293
294 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
298 let listener = TcpListener::bind(&self.options.bind_addr)?;
299 self.serve_tls_on(listener, tls_config)
300 }
301
302 pub fn serve_tls_on(
303 &self,
304 listener: TcpListener,
305 tls_config: std::sync::Arc<rustls::ServerConfig>,
306 ) -> io::Result<()> {
307 for stream in listener.incoming() {
308 match stream {
309 Ok(stream) => {
310 let server = self.clone();
311 let cfg = tls_config.clone();
312 thread::spawn(move || {
313 let _ = server.handle_tls_connection(stream, cfg);
314 });
315 }
316 Err(err) => return Err(err),
317 }
318 }
319 Ok(())
320 }
321
322 pub fn serve_tls_in_background(
323 &self,
324 tls_config: std::sync::Arc<rustls::ServerConfig>,
325 ) -> thread::JoinHandle<io::Result<()>> {
326 let server = self.clone();
327 thread::spawn(move || server.serve_tls(tls_config))
328 }
329
330 pub fn serve_tls_in_background_on(
331 &self,
332 listener: TcpListener,
333 tls_config: std::sync::Arc<rustls::ServerConfig>,
334 ) -> thread::JoinHandle<io::Result<()>> {
335 let server = self.clone();
336 thread::spawn(move || server.serve_tls_on(listener, tls_config))
337 }
338
339 fn handle_connection(&self, mut stream: TcpStream) -> io::Result<()> {
340 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
341 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
342
343 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
344 let response = self.route(request);
345 stream.write_all(&response.to_http_bytes())?;
346 stream.flush()?;
347 Ok(())
348 }
349
350 fn handle_tls_connection(
351 &self,
352 tcp: TcpStream,
353 tls_config: std::sync::Arc<rustls::ServerConfig>,
354 ) -> io::Result<()> {
355 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
356 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
357 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
358 Ok(s) => s,
359 Err(err) => {
360 tracing::warn!(
361 target: "reddb::http_tls",
362 err = %err,
363 "TLS handshake failed"
364 );
365 return Err(err);
366 }
367 };
368 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
369 Ok(req) => req,
370 Err(err) => {
371 tracing::warn!(
372 target: "reddb::http_tls",
373 err = %err,
374 "TLS request parse failed"
375 );
376 return Err(err);
377 }
378 };
379 let response = self.route(request);
380 tls_stream.write_all(&response.to_http_bytes())?;
381 tls_stream.flush()?;
382 Ok(())
383 }
384}