1pub(crate) use crate::application::json_input::{
4 json_bool_field, json_f32_field, json_string_field, json_usize_field,
5};
6pub(crate) use crate::application::{
7 AdminUseCases, CatalogUseCases, CreateDocumentInput, CreateEdgeInput, CreateEntityOutput,
8 CreateKvInput, CreateNodeEmbeddingInput, CreateNodeGraphLinkInput, CreateNodeInput,
9 CreateNodeTableLinkInput, CreateRowInput, CreateVectorInput, DeleteEntityInput, EntityUseCases,
10 ExecuteQueryInput, ExplainQueryInput, GraphCentralityInput, GraphClusteringInput,
11 GraphCommunitiesInput, GraphComponentsInput, GraphCyclesInput, GraphHitsInput,
12 GraphNeighborhoodInput, GraphPersonalizedPageRankInput, GraphShortestPathInput,
13 GraphTopologicalSortInput, GraphTraversalInput, GraphUseCases, InspectNativeArtifactInput,
14 NativeUseCases, PatchEntityInput, PatchEntityOperation, PatchEntityOperationType,
15 QueryUseCases, SearchHybridInput, SearchIvfInput, SearchMultimodalInput, SearchSimilarInput,
16 SearchTextInput, TreeUseCases,
17};
18use std::collections::{BTreeMap, HashMap};
19use std::io::{self, Read, Write};
20use std::net::{TcpListener, TcpStream};
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25use std::sync::Arc;
26
27use crate::api::{RedDBError, RedDBOptions, RedDBResult};
28use crate::auth::store::AuthStore;
29use crate::catalog::{CatalogModelSnapshot, CollectionDescriptor, CollectionModel, SchemaMode};
30use crate::health::{HealthProvider, HealthReport, HealthState};
31use crate::json::{parse_json, to_vec as json_to_vec, Map, Value as JsonValue};
32use crate::runtime::{
33 RedDBRuntime, RuntimeFilter, RuntimeFilterValue, RuntimeGraphCentralityAlgorithm,
34 RuntimeGraphCentralityResult, RuntimeGraphClusteringResult, RuntimeGraphCommunityAlgorithm,
35 RuntimeGraphCommunityResult, RuntimeGraphComponentsMode, RuntimeGraphComponentsResult,
36 RuntimeGraphCyclesResult, RuntimeGraphDirection, RuntimeGraphHitsResult,
37 RuntimeGraphNeighborhoodResult, RuntimeGraphPathAlgorithm, RuntimeGraphPathResult,
38 RuntimeGraphPattern, RuntimeGraphProjection, RuntimeGraphTopologicalSortResult,
39 RuntimeGraphTraversalResult, RuntimeGraphTraversalStrategy, RuntimeIvfSearchResult,
40 RuntimeQueryWeights, RuntimeStats, ScanCursor, ScanPage,
41};
42use crate::storage::schema::Value;
43use crate::storage::unified::devx::refs::{NodeRef, TableRef, VectorRef};
44use crate::storage::unified::dsl::{MatchComponents, QueryResult as DslQueryResult};
45use crate::storage::unified::{MetadataValue, RefTarget, SparseVector};
46use crate::storage::{CrossRef, EntityData, EntityId, EntityKind, SimilarResult, UnifiedEntity};
47
48fn analytics_job_json(job: &crate::PhysicalAnalyticsJob) -> JsonValue {
49 crate::presentation::admin_json::analytics_job_json(job)
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55 use crate::api::RedDBOptions;
56 use crate::health::HealthReport;
57 use crate::service_cli::{
58 TransportListenerFailure, TransportListenerState, TransportReadiness,
59 };
60
61 #[test]
62 fn server_options_default_http_body_limit_is_32_mib() {
63 assert_eq!(ServerOptions::default().max_body_bytes, 32 * 1024 * 1024);
64 }
65
66 #[test]
67 fn health_json_reports_transport_listeners() {
68 let runtime = RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime");
69 let mut options = ServerOptions::default();
70 options.transport_readiness = TransportReadiness {
71 active: vec![TransportListenerState {
72 transport: "grpc".to_string(),
73 bind_addr: "127.0.0.1:50051".to_string(),
74 explicit: true,
75 }],
76 failed: vec![TransportListenerFailure {
77 transport: "http".to_string(),
78 bind_addr: "127.0.0.1:5055".to_string(),
79 explicit: false,
80 reason: "http listener bind 127.0.0.1:5055: address in use".to_string(),
81 }],
82 };
83 let server = RedDBServer::with_options(runtime, options);
84
85 let payload = server.health_json_with_transport(&HealthReport::healthy());
86 let JsonValue::Object(root) = payload else {
87 panic!("health payload should be an object");
88 };
89 let Some(JsonValue::Object(listeners)) = root.get("transport_listeners") else {
90 panic!("health payload should include transport_listeners");
91 };
92 let Some(JsonValue::Array(active)) = listeners.get("active") else {
93 panic!("transport_listeners.active should be an array");
94 };
95 let Some(JsonValue::Array(failed)) = listeners.get("failed") else {
96 panic!("transport_listeners.failed should be an array");
97 };
98
99 assert_eq!(active.len(), 1);
100 assert_eq!(failed.len(), 1);
101 }
102}
103
104fn graph_projection_json(projection: &crate::PhysicalGraphProjection) -> JsonValue {
105 crate::presentation::admin_json::graph_projection_json(projection)
106}
107
108pub mod handlers_admin;
109mod handlers_ai;
110mod handlers_ai_model_cache;
111mod handlers_auth;
112mod handlers_backup;
113mod handlers_ec;
114pub(crate) mod handlers_entity;
115mod handlers_geo;
116mod handlers_graph;
117mod handlers_keyed;
118mod handlers_log;
119mod handlers_metrics;
120mod handlers_ops;
121mod handlers_query;
122mod handlers_replication;
123mod handlers_vcs;
124mod handlers_vector;
125pub mod header_escape_guard;
126pub mod http_connection_limiter;
127pub mod http_handler_metrics;
128pub mod http_limits;
129pub mod ingest_pipeline;
130mod patch_support;
131mod request_body;
132mod request_context;
133mod routing;
134mod serverless_support;
135pub mod tls;
136mod transport;
137
138use self::handlers_ai::*;
139use self::handlers_entity::*;
140use self::handlers_graph::*;
141use self::handlers_keyed::*;
142use self::handlers_metrics::*;
143use self::handlers_ops::*;
144use self::handlers_query::*;
145use self::http_connection_limiter::{
146 HandlerDeadline, HttpConnectionLimiter, MonotonicClock, SystemMonotonicClock,
147};
148use self::http_handler_metrics::{HttpHandlerMetrics, HttpRejectReason, HttpTransport};
149pub use self::http_limits::{
150 HttpLimitsCliInput, HttpLimitsResolved, DEFAULT_HANDLER_TIMEOUT_MS, DEFAULT_RETRY_AFTER_SECS,
151};
152use self::patch_support::*;
153use self::request_body::*;
154use self::routing::*;
155use self::serverless_support::*;
156use self::transport::*;
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
164pub enum ServerSurface {
165 Public,
167 AdminOnly,
171 MetricsOnly,
175}
176
177#[derive(Debug, Clone)]
178pub struct ServerOptions {
179 pub bind_addr: String,
180 pub max_body_bytes: usize,
181 pub read_timeout_ms: u64,
182 pub write_timeout_ms: u64,
183 pub max_scan_limit: usize,
184 pub surface: ServerSurface,
188 pub transport_readiness: crate::service_cli::TransportReadiness,
189}
190
191pub const DEFAULT_HTTP_MAX_BODY_BYTES: usize = 32 * 1024 * 1024;
192
193impl Default for ServerOptions {
194 fn default() -> Self {
195 Self {
196 bind_addr: "127.0.0.1:5055".to_string(),
197 max_body_bytes: DEFAULT_HTTP_MAX_BODY_BYTES,
198 read_timeout_ms: 5_000,
199 write_timeout_ms: 5_000,
200 max_scan_limit: 1_000,
201 surface: ServerSurface::Public,
202 transport_readiness: crate::service_cli::TransportReadiness::default(),
203 }
204 }
205}
206
207pub struct ServerReplicationState {
209 pub config: crate::replication::ReplicationConfig,
210 pub primary: Option<crate::replication::primary::PrimaryReplication>,
211}
212
213#[derive(Clone)]
214pub struct RedDBServer {
215 runtime: RedDBRuntime,
216 options: ServerOptions,
217 auth_store: Option<Arc<AuthStore>>,
218 replication: Option<Arc<ServerReplicationState>>,
219 http_limiter: HttpConnectionLimiter,
224 handler_timeout: Duration,
230 handler_clock: Arc<dyn MonotonicClock>,
237 slow_inject_ms: Arc<AtomicU64>,
244 http_metrics: HttpHandlerMetrics,
250 retry_after_secs: u64,
254 reject_503_bytes: Arc<Vec<u8>>,
258}
259
260const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_millis(30_000);
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264enum ServerlessWarmupScope {
265 Indexes,
266 GraphProjections,
267 AnalyticsJobs,
268 NativeArtifacts,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272enum DeploymentProfile {
273 Embedded,
274 Server,
275 Serverless,
276}
277
278fn percent_decode_path_segment(input: &str) -> Result<String, String> {
279 let bytes = input.as_bytes();
280 let mut out = Vec::with_capacity(bytes.len());
281 let mut index = 0;
282 while index < bytes.len() {
283 match bytes[index] {
284 b'%' => {
285 if index + 2 >= bytes.len() {
286 return Err("truncated percent escape".to_string());
287 }
288 let high = hex_value(bytes[index + 1])
289 .ok_or_else(|| "invalid percent escape".to_string())?;
290 let low = hex_value(bytes[index + 2])
291 .ok_or_else(|| "invalid percent escape".to_string())?;
292 out.push((high << 4) | low);
293 index += 3;
294 }
295 byte => {
296 out.push(byte);
297 index += 1;
298 }
299 }
300 }
301 String::from_utf8(out).map_err(|_| "path segment is not valid UTF-8".to_string())
302}
303
304fn hex_value(byte: u8) -> Option<u8> {
305 match byte {
306 b'0'..=b'9' => Some(byte - b'0'),
307 b'a'..=b'f' => Some(byte - b'a' + 10),
308 b'A'..=b'F' => Some(byte - b'A' + 10),
309 _ => None,
310 }
311}
312
313#[derive(Debug, Clone)]
314struct ParsedQueryRequest {
315 query: String,
316 entity_types: Option<Vec<String>>,
317 capabilities: Option<Vec<String>>,
318 params: Option<Vec<Value>>,
322}
323
324#[derive(Debug, Clone, Copy)]
325enum PatchOperationType {
326 Set,
327 Replace,
328 Unset,
329}
330
331#[derive(Debug, Clone)]
332struct PatchOperation {
333 op: PatchOperationType,
334 path: Vec<String>,
335 value: Option<JsonValue>,
336}
337
338impl RedDBServer {
339 pub fn new(runtime: RedDBRuntime) -> Self {
340 Self::with_options(runtime, ServerOptions::default())
341 }
342
343 pub fn from_database_options(
344 db_options: RedDBOptions,
345 server_options: ServerOptions,
346 ) -> RedDBResult<Self> {
347 let runtime = RedDBRuntime::with_options(db_options)?;
348 Ok(Self::with_options(runtime, server_options))
349 }
350
351 pub fn with_options(runtime: RedDBRuntime, options: ServerOptions) -> Self {
352 Self {
353 runtime,
354 options,
355 auth_store: None,
356 replication: None,
357 http_limiter: HttpConnectionLimiter::with_default_cap(),
358 handler_timeout: DEFAULT_HANDLER_TIMEOUT,
359 handler_clock: Arc::new(SystemMonotonicClock::new()),
360 slow_inject_ms: Arc::new(AtomicU64::new(0)),
361 http_metrics: HttpHandlerMetrics::new(),
362 retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
363 reject_503_bytes: Arc::new(build_reject_503_bytes(DEFAULT_RETRY_AFTER_SECS)),
364 }
365 }
366
367 #[doc(hidden)]
368 pub fn http_metrics(&self) -> &HttpHandlerMetrics {
369 &self.http_metrics
370 }
371
372 #[doc(hidden)]
377 pub fn with_http_limiter_cap(mut self, cap: usize) -> Self {
378 self.http_limiter = HttpConnectionLimiter::new(cap);
379 self
380 }
381
382 pub fn with_http_limits(mut self, limits: HttpLimitsResolved) -> Self {
387 self.http_limiter = HttpConnectionLimiter::new(limits.max_handlers);
388 self.handler_timeout = Duration::from_millis(limits.handler_timeout_ms);
389 self.retry_after_secs = limits.retry_after_secs;
390 self.reject_503_bytes = Arc::new(build_reject_503_bytes(limits.retry_after_secs));
391 self
392 }
393
394 #[doc(hidden)]
395 pub fn retry_after_secs(&self) -> u64 {
396 self.retry_after_secs
397 }
398
399 #[doc(hidden)]
400 pub fn http_limiter(&self) -> &HttpConnectionLimiter {
401 &self.http_limiter
402 }
403
404 #[doc(hidden)]
407 pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
408 self.handler_timeout = timeout;
409 self
410 }
411
412 #[doc(hidden)]
413 pub fn handler_timeout(&self) -> Duration {
414 self.handler_timeout
415 }
416
417 #[doc(hidden)]
422 pub fn with_handler_clock(mut self, clock: Arc<dyn MonotonicClock>) -> Self {
423 self.handler_clock = clock;
424 self
425 }
426
427 #[doc(hidden)]
433 pub fn set_test_slow_inject_ms(&self, ms: u64) {
434 self.slow_inject_ms.store(ms, Ordering::Relaxed);
435 }
436
437 pub fn with_auth(mut self, auth_store: Arc<AuthStore>) -> Self {
441 self.runtime.set_auth_store(Arc::clone(&auth_store));
442 self.auth_store = Some(auth_store);
443 self
444 }
445
446 pub fn with_replication(mut self, state: Arc<ServerReplicationState>) -> Self {
448 self.replication = Some(state);
449 self
450 }
451
452 pub fn runtime(&self) -> &RedDBRuntime {
453 &self.runtime
454 }
455
456 pub fn options(&self) -> &ServerOptions {
457 &self.options
458 }
459
460 fn query_use_cases(&self) -> QueryUseCases<'_, RedDBRuntime> {
461 QueryUseCases::new(&self.runtime)
462 }
463
464 fn admin_use_cases(&self) -> AdminUseCases<'_, RedDBRuntime> {
465 AdminUseCases::new(&self.runtime)
466 }
467
468 fn entity_use_cases(&self) -> EntityUseCases<'_, RedDBRuntime> {
469 EntityUseCases::new(&self.runtime)
470 }
471
472 fn catalog_use_cases(&self) -> CatalogUseCases<'_, RedDBRuntime> {
473 CatalogUseCases::new(&self.runtime)
474 }
475
476 fn graph_use_cases(&self) -> GraphUseCases<'_, RedDBRuntime> {
477 GraphUseCases::new(&self.runtime)
478 }
479
480 fn native_use_cases(&self) -> NativeUseCases<'_, RedDBRuntime> {
481 NativeUseCases::new(&self.runtime)
482 }
483
484 fn tree_use_cases(&self) -> TreeUseCases<'_, RedDBRuntime> {
485 TreeUseCases::new(&self.runtime)
486 }
487
488 fn transport_readiness_json(&self) -> JsonValue {
489 let active = self
490 .options
491 .transport_readiness
492 .active
493 .iter()
494 .map(|listener| {
495 let mut object = Map::new();
496 object.insert(
497 "transport".to_string(),
498 JsonValue::String(listener.transport.clone()),
499 );
500 object.insert(
501 "bind_addr".to_string(),
502 JsonValue::String(listener.bind_addr.clone()),
503 );
504 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
505 JsonValue::Object(object)
506 })
507 .collect();
508 let failed = self
509 .options
510 .transport_readiness
511 .failed
512 .iter()
513 .map(|listener| {
514 let mut object = Map::new();
515 object.insert(
516 "transport".to_string(),
517 JsonValue::String(listener.transport.clone()),
518 );
519 object.insert(
520 "bind_addr".to_string(),
521 JsonValue::String(listener.bind_addr.clone()),
522 );
523 object.insert("explicit".to_string(), JsonValue::Bool(listener.explicit));
524 object.insert(
525 "reason".to_string(),
526 JsonValue::String(listener.reason.clone()),
527 );
528 JsonValue::Object(object)
529 })
530 .collect();
531
532 let mut object = Map::new();
533 object.insert("active".to_string(), JsonValue::Array(active));
534 object.insert("failed".to_string(), JsonValue::Array(failed));
535 JsonValue::Object(object)
536 }
537
538 fn handle_grpc_discovery(&self) -> HttpResponse {
539 let mut methods = Map::new();
540 methods.insert(
541 "query".to_string(),
542 JsonValue::String("reddb.v1.RedDB/Query".to_string()),
543 );
544 methods.insert(
545 "batch_query".to_string(),
546 JsonValue::String("reddb.v1.RedDB/BatchQuery".to_string()),
547 );
548 methods.insert(
549 "health".to_string(),
550 JsonValue::String("reddb.v1.RedDB/Health".to_string()),
551 );
552 methods.insert(
553 "prepare".to_string(),
554 JsonValue::String("reddb.v1.RedDB/Prepare".to_string()),
555 );
556 methods.insert(
557 "execute_prepared".to_string(),
558 JsonValue::String("reddb.v1.RedDB/ExecutePrepared".to_string()),
559 );
560
561 let mut examples = Map::new();
562 examples.insert(
563 "query".to_string(),
564 JsonValue::String(
565 "grpcurl -plaintext -d '{\"query\":\"SELECT 1\"}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
566 .to_string(),
567 ),
568 );
569 examples.insert(
570 "query_with_params".to_string(),
571 JsonValue::String(
572 "grpcurl -plaintext -d '{\"query\":\"SELECT $1 AS value\",\"params\":[{\"intValue\":42}]}' 127.0.0.1:50051 reddb.v1.RedDB/Query"
573 .to_string(),
574 ),
575 );
576 examples.insert(
577 "health".to_string(),
578 JsonValue::String(
579 "grpcurl -plaintext -d '{}' 127.0.0.1:50051 reddb.v1.RedDB/Health".to_string(),
580 ),
581 );
582
583 let mut object = Map::new();
584 object.insert("ok".to_string(), JsonValue::Bool(true));
585 object.insert(
586 "service".to_string(),
587 JsonValue::String("reddb.v1.RedDB".to_string()),
588 );
589 object.insert(
590 "package".to_string(),
591 JsonValue::String("reddb.v1".to_string()),
592 );
593 object.insert(
594 "proto".to_string(),
595 JsonValue::String("crates/reddb-grpc-proto/proto/reddb.proto".to_string()),
596 );
597 object.insert("methods".to_string(), JsonValue::Object(methods));
598 object.insert("examples".to_string(), JsonValue::Object(examples));
599 object.insert(
600 "transport_listeners".to_string(),
601 self.transport_readiness_json(),
602 );
603 object.insert(
604 "hint".to_string(),
605 JsonValue::String(
606 "If grpcurl cannot list services, pass the proto file with -import-path crates/reddb-grpc-proto/proto -proto reddb.proto."
607 .to_string(),
608 ),
609 );
610 json_response(200, JsonValue::Object(object))
611 }
612
613 fn handle_query_contract(&self) -> HttpResponse {
614 let mut examples = Map::new();
615 examples.insert(
616 "raw_sql".to_string(),
617 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
618 );
619 examples.insert(
620 "json_query".to_string(),
621 JsonValue::String(
622 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
623 .to_string(),
624 ),
625 );
626 examples.insert(
627 "json_query_with_params".to_string(),
628 JsonValue::String(
629 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
630 .to_string(),
631 ),
632 );
633
634 let mut request_body = Map::new();
635 request_body.insert(
636 "query".to_string(),
637 JsonValue::String("required string".to_string()),
638 );
639 request_body.insert(
640 "params".to_string(),
641 JsonValue::String("optional array".to_string()),
642 );
643
644 let mut response_shape = Map::new();
645 response_shape.insert(
646 "columns".to_string(),
647 JsonValue::String("projected column names".to_string()),
648 );
649 response_shape.insert(
650 "records[].values".to_string(),
651 JsonValue::String("only projected values".to_string()),
652 );
653 response_shape.insert(
654 "records[].meta".to_string(),
655 JsonValue::String("internal metadata when present".to_string()),
656 );
657
658 let mut object = Map::new();
659 object.insert("ok".to_string(), JsonValue::Bool(false));
660 object.insert(
661 "code".to_string(),
662 JsonValue::String("method_not_allowed".to_string()),
663 );
664 object.insert(
665 "message".to_string(),
666 JsonValue::String("/query accepts POST requests".to_string()),
667 );
668 object.insert(
669 "hint".to_string(),
670 JsonValue::String(
671 "Send raw SQL in the body, or JSON with a string 'query' field.".to_string(),
672 ),
673 );
674 object.insert("method".to_string(), JsonValue::String("POST".to_string()));
675 object.insert("path".to_string(), JsonValue::String("/query".to_string()));
676 object.insert("request_body".to_string(), JsonValue::Object(request_body));
677 object.insert(
678 "response_shape".to_string(),
679 JsonValue::Object(response_shape),
680 );
681 object.insert("examples".to_string(), JsonValue::Object(examples));
682 object.insert(
683 "docs".to_string(),
684 JsonValue::String("https://reddb.io/docs/query".to_string()),
685 );
686
687 json_response(405, JsonValue::Object(object))
688 .with_header("Allow", http::HeaderValue::from_static("POST"))
689 }
690
691 fn handle_root_discovery(&self) -> HttpResponse {
692 let mut endpoints = Map::new();
693 endpoints.insert(
694 "health".to_string(),
695 JsonValue::String("GET /health".to_string()),
696 );
697 endpoints.insert(
698 "ready".to_string(),
699 JsonValue::String("GET /ready".to_string()),
700 );
701 endpoints.insert(
702 "query".to_string(),
703 JsonValue::String("POST /query".to_string()),
704 );
705 endpoints.insert(
706 "query_readiness".to_string(),
707 JsonValue::String("GET /ready/query".to_string()),
708 );
709 endpoints.insert(
710 "catalog".to_string(),
711 JsonValue::String("GET /catalog".to_string()),
712 );
713 endpoints.insert(
714 "deployment_profiles".to_string(),
715 JsonValue::String("GET /deployment/profiles".to_string()),
716 );
717
718 let mut examples = Map::new();
719 examples.insert(
720 "http_raw_sql".to_string(),
721 JsonValue::String("curl -sS http://127.0.0.1:8080/query -d 'SELECT 1'".to_string()),
722 );
723 examples.insert(
724 "http_json_query".to_string(),
725 JsonValue::String(
726 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT 1\"}'"
727 .to_string(),
728 ),
729 );
730 examples.insert(
731 "http_json_query_with_params".to_string(),
732 JsonValue::String(
733 "curl -sS http://127.0.0.1:8080/query -H 'content-type: application/json' -d '{\"query\":\"SELECT $1 AS value\",\"params\":[42]}'"
734 .to_string(),
735 ),
736 );
737
738 let mut object = Map::new();
739 object.insert("ok".to_string(), JsonValue::Bool(true));
740 object.insert(
741 "service".to_string(),
742 JsonValue::String("reddb".to_string()),
743 );
744 object.insert(
745 "version".to_string(),
746 JsonValue::String(env!("CARGO_PKG_VERSION").to_string()),
747 );
748 object.insert("endpoints".to_string(), JsonValue::Object(endpoints));
749 object.insert("examples".to_string(), JsonValue::Object(examples));
750 object.insert(
751 "docs".to_string(),
752 JsonValue::String("https://reddb.io/docs".to_string()),
753 );
754 object.insert(
755 "transport_listeners".to_string(),
756 self.transport_readiness_json(),
757 );
758 json_response(200, JsonValue::Object(object))
759 }
760
761 fn health_json_with_transport(&self, report: &HealthReport) -> JsonValue {
762 let mut value = crate::presentation::ops_json::health_json(report);
763 if let JsonValue::Object(ref mut object) = value {
764 object.insert(
765 "transport_listeners".to_string(),
766 self.transport_readiness_json(),
767 );
768 }
769 value
770 }
771
772 pub fn serve(&self) -> io::Result<()> {
773 let listener = TcpListener::bind(&self.options.bind_addr)?;
774 self.serve_on(listener)
775 }
776
777 pub fn serve_on(&self, listener: TcpListener) -> io::Result<()> {
778 for stream in listener.incoming() {
779 match stream {
780 Ok(stream) => match self.http_limiter.try_acquire() {
781 Some(permit) => {
782 let server = self.clone();
784 thread::spawn(move || {
785 let _guard = permit; let _ = server.handle_connection(stream);
787 });
788 }
789 None => {
790 self.http_metrics
795 .record_reject(HttpTransport::Http, HttpRejectReason::CapExhausted);
796 self.reject_with_503(stream, self.options.write_timeout_ms);
797 }
798 },
799 Err(err) => return Err(err),
800 }
801 }
802 Ok(())
803 }
804
805 fn reject_with_503(&self, mut stream: TcpStream, write_timeout_ms: u64) {
810 let _ = stream.set_write_timeout(Some(Duration::from_millis(write_timeout_ms)));
811 let _ = stream.write_all(&self.reject_503_bytes);
812 let _ = stream.flush();
813 let _ = stream.shutdown(std::net::Shutdown::Both);
814 }
815
816 pub fn serve_one_on(&self, listener: TcpListener) -> io::Result<()> {
817 let (stream, _) = listener.accept()?;
818 self.handle_connection(stream)
819 }
820
821 pub fn serve_in_background(&self) -> thread::JoinHandle<io::Result<()>> {
822 let server = self.clone();
823 thread::spawn(move || server.serve())
824 }
825
826 pub fn serve_in_background_on(
827 &self,
828 listener: TcpListener,
829 ) -> thread::JoinHandle<io::Result<()>> {
830 let server = self.clone();
831 thread::spawn(move || server.serve_on(listener))
832 }
833
834 pub fn serve_tls(&self, tls_config: std::sync::Arc<rustls::ServerConfig>) -> io::Result<()> {
838 let listener = TcpListener::bind(&self.options.bind_addr)?;
839 self.serve_tls_on(listener, tls_config)
840 }
841
842 pub fn serve_tls_on(
843 &self,
844 listener: TcpListener,
845 tls_config: std::sync::Arc<rustls::ServerConfig>,
846 ) -> io::Result<()> {
847 for stream in listener.incoming() {
848 match stream {
849 Ok(stream) => match self.http_limiter.try_acquire() {
850 Some(permit) => {
851 let server = self.clone();
852 let cfg = tls_config.clone();
853 thread::spawn(move || {
854 let _guard = permit; let _ = server.handle_tls_connection(stream, cfg);
856 });
857 }
858 None => {
859 self.http_metrics
865 .record_reject(HttpTransport::Https, HttpRejectReason::CapExhausted);
866 let _ = stream.shutdown(std::net::Shutdown::Both);
867 drop(stream);
868 }
869 },
870 Err(err) => return Err(err),
871 }
872 }
873 Ok(())
874 }
875
876 pub fn serve_tls_in_background(
877 &self,
878 tls_config: std::sync::Arc<rustls::ServerConfig>,
879 ) -> thread::JoinHandle<io::Result<()>> {
880 let server = self.clone();
881 thread::spawn(move || server.serve_tls(tls_config))
882 }
883
884 pub fn serve_tls_in_background_on(
885 &self,
886 listener: TcpListener,
887 tls_config: std::sync::Arc<rustls::ServerConfig>,
888 ) -> thread::JoinHandle<io::Result<()>> {
889 let server = self.clone();
890 thread::spawn(move || server.serve_tls_on(listener, tls_config))
891 }
892
893 fn handle_connection(&self, stream: TcpStream) -> io::Result<()> {
894 let started = Instant::now();
895 let result = self.handle_connection_inner(stream);
896 let elapsed = started.elapsed().as_secs_f64();
897 self.http_metrics
898 .record_duration(HttpTransport::Http, elapsed);
899 result
900 }
901
902 fn handle_connection_inner(&self, mut stream: TcpStream) -> io::Result<()> {
903 stream.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
904 stream.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
905
906 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
914
915 let request = HttpRequest::read_from(&mut stream, self.options.max_body_bytes)?;
916
917 if deadline.expired() {
919 self.http_metrics
920 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
921 Self::write_handler_timeout_503(&mut stream);
922 return Ok(());
923 }
924
925 if self.try_route_streaming(&request, &mut stream)? {
926 return Ok(());
927 }
928 let response = self.route(request);
929
930 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
934 if inject_ms > 0 {
935 thread::sleep(Duration::from_millis(inject_ms));
936 }
937
938 if deadline.expired() {
940 self.http_metrics
941 .record_reject(HttpTransport::Http, HttpRejectReason::HandlerTimeout);
942 Self::write_handler_timeout_503(&mut stream);
943 return Ok(());
944 }
945
946 stream.write_all(&response.to_http_bytes())?;
947 stream.flush()?;
948 Ok(())
949 }
950
951 fn write_handler_timeout_503<S: Write>(stream: &mut S) {
957 const RESPONSE: &[u8] = b"HTTP/1.1 503 Service Unavailable\r\n\
958 Connection: close\r\n\
959 Content-Length: 0\r\n\
960 \r\n";
961 let _ = stream.write_all(RESPONSE);
962 let _ = stream.flush();
963 }
964
965 fn handle_tls_connection(
966 &self,
967 tcp: TcpStream,
968 tls_config: std::sync::Arc<rustls::ServerConfig>,
969 ) -> io::Result<()> {
970 let started = Instant::now();
971 let result = self.handle_tls_connection_inner(tcp, tls_config);
972 let elapsed = started.elapsed().as_secs_f64();
973 self.http_metrics
974 .record_duration(HttpTransport::Https, elapsed);
975 result
976 }
977
978 fn handle_tls_connection_inner(
979 &self,
980 tcp: TcpStream,
981 tls_config: std::sync::Arc<rustls::ServerConfig>,
982 ) -> io::Result<()> {
983 tcp.set_read_timeout(Some(Duration::from_millis(self.options.read_timeout_ms)))?;
984 tcp.set_write_timeout(Some(Duration::from_millis(self.options.write_timeout_ms)))?;
985
986 let deadline = HandlerDeadline::arm(Arc::clone(&self.handler_clock), self.handler_timeout);
990
991 let mut tls_stream = match self::tls::accept_tls(tls_config, tcp) {
992 Ok(s) => s,
993 Err(err) => {
994 tracing::warn!(
995 target: "reddb::http_tls",
996 err = %err,
997 "TLS handshake failed"
998 );
999 return Err(err);
1000 }
1001 };
1002 let request = match HttpRequest::read_from(&mut tls_stream, self.options.max_body_bytes) {
1003 Ok(req) => req,
1004 Err(err) => {
1005 tracing::warn!(
1006 target: "reddb::http_tls",
1007 err = %err,
1008 "TLS request parse failed"
1009 );
1010 return Err(err);
1011 }
1012 };
1013
1014 if deadline.expired() {
1016 self.http_metrics
1017 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1018 Self::write_handler_timeout_503(&mut tls_stream);
1019 return Ok(());
1020 }
1021
1022 if self.try_route_streaming(&request, &mut tls_stream)? {
1023 return Ok(());
1024 }
1025 let response = self.route(request);
1026
1027 let inject_ms = self.slow_inject_ms.load(Ordering::Relaxed);
1030 if inject_ms > 0 {
1031 thread::sleep(Duration::from_millis(inject_ms));
1032 }
1033
1034 if deadline.expired() {
1036 self.http_metrics
1037 .record_reject(HttpTransport::Https, HttpRejectReason::HandlerTimeout);
1038 Self::write_handler_timeout_503(&mut tls_stream);
1039 return Ok(());
1040 }
1041
1042 tls_stream.write_all(&response.to_http_bytes())?;
1043 tls_stream.flush()?;
1044 Ok(())
1045 }
1046}
1047
1048fn build_reject_503_bytes(retry_after_secs: u64) -> Vec<u8> {
1053 format!(
1054 "HTTP/1.1 503 Service Unavailable\r\n\
1055 Connection: close\r\n\
1056 Content-Length: 0\r\n\
1057 Retry-After: {retry_after_secs}\r\n\
1058 \r\n"
1059 )
1060 .into_bytes()
1061}